67305f70691ff1d49f6ecb88d1c1cceb92fd43bf
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           if instance_cfg[instance].auto_balance:
836             needed_mem += instance_cfg[instance].memory
837         if nodeinfo['mfree'] < needed_mem:
838           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
839                       " failovers should node %s fail" % (node, prinode))
840           bad = True
841     return bad
842
843   def CheckPrereq(self):
844     """Check prerequisites.
845
846     Transform the list of checks we're going to skip into a set and check that
847     all its members are valid.
848
849     """
850     self.skip_set = frozenset(self.op.skip_checks)
851     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
852       raise errors.OpPrereqError("Invalid checks to be skipped specified")
853
854   def BuildHooksEnv(self):
855     """Build hooks env.
856
857     Cluster-Verify hooks just rone in the post phase and their failure makes
858     the output be logged in the verify output and the verification to fail.
859
860     """
861     all_nodes = self.cfg.GetNodeList()
862     tags = self.cfg.GetClusterInfo().GetTags()
863     # TODO: populate the environment with useful information for verify hooks
864     env = {
865       "CLUSTER_TAGS": " ".join(tags),
866       }
867     return env, [], all_nodes
868
869   def Exec(self, feedback_fn):
870     """Verify integrity of cluster, performing various test on nodes.
871
872     """
873     bad = False
874     feedback_fn("* Verifying global settings")
875     for msg in self.cfg.VerifyConfig():
876       feedback_fn("  - ERROR: %s" % msg)
877
878     vg_name = self.cfg.GetVGName()
879     nodelist = utils.NiceSort(self.cfg.GetNodeList())
880     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
881     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
882     i_non_redundant = [] # Non redundant instances
883     i_non_a_balanced = [] # Non auto-balanced instances
884     node_volume = {}
885     node_instance = {}
886     node_info = {}
887     instance_cfg = {}
888
889     # FIXME: verify OS list
890     # do local checksums
891     file_names = list(self.sstore.GetFileList())
892     file_names.append(constants.SSL_CERT_FILE)
893     file_names.append(constants.CLUSTER_CONF_FILE)
894     local_checksums = utils.FingerprintFiles(file_names)
895
896     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
897     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
898     all_instanceinfo = rpc.call_instance_list(nodelist)
899     all_vglist = rpc.call_vg_list(nodelist)
900     node_verify_param = {
901       'filelist': file_names,
902       'nodelist': nodelist,
903       'hypervisor': None,
904       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
905                         for node in nodeinfo]
906       }
907     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
908     all_rversion = rpc.call_version(nodelist)
909     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
910
911     incomplete_nodeinfo = False
912
913     for node in nodelist:
914       feedback_fn("* Verifying node %s" % node)
915       result = self._VerifyNode(node, file_names, local_checksums,
916                                 all_vglist[node], all_nvinfo[node],
917                                 all_rversion[node], feedback_fn)
918       bad = bad or result
919
920       # node_volume
921       volumeinfo = all_volumeinfo[node]
922
923       if isinstance(volumeinfo, basestring):
924         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
925                     (node, volumeinfo[-400:].encode('string_escape')))
926         bad = True
927         node_volume[node] = {}
928       elif not isinstance(volumeinfo, dict):
929         feedback_fn("  - ERROR: connection to %s failed" % (node,))
930         bad = True
931         incomplete_nodeinfo = True
932         continue
933       else:
934         node_volume[node] = volumeinfo
935
936       # node_instance
937       nodeinstance = all_instanceinfo[node]
938       if type(nodeinstance) != list:
939         feedback_fn("  - ERROR: connection to %s failed" % (node,))
940         bad = True
941         incomplete_nodeinfo = True
942         continue
943
944       node_instance[node] = nodeinstance
945
946       # node_info
947       nodeinfo = all_ninfo[node]
948       if not isinstance(nodeinfo, dict):
949         feedback_fn("  - ERROR: connection to %s failed" % (node,))
950         bad = True
951         incomplete_nodeinfo = True
952         continue
953
954       try:
955         node_info[node] = {
956           "mfree": int(nodeinfo['memory_free']),
957           "dfree": int(nodeinfo['vg_free']),
958           "pinst": [],
959           "sinst": [],
960           # dictionary holding all instances this node is secondary for,
961           # grouped by their primary node. Each key is a cluster node, and each
962           # value is a list of instances which have the key as primary and the
963           # current node as secondary.  this is handy to calculate N+1 memory
964           # availability if you can only failover from a primary to its
965           # secondary.
966           "sinst-by-pnode": {},
967         }
968       except (ValueError, TypeError):
969         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
970         bad = True
971         incomplete_nodeinfo = True
972         continue
973
974     node_vol_should = {}
975
976     for instance in instancelist:
977       feedback_fn("* Verifying instance %s" % instance)
978       inst_config = self.cfg.GetInstanceInfo(instance)
979       result =  self._VerifyInstance(instance, inst_config, node_volume,
980                                      node_instance, feedback_fn)
981       bad = bad or result
982
983       inst_config.MapLVsByNode(node_vol_should)
984
985       instance_cfg[instance] = inst_config
986
987       pnode = inst_config.primary_node
988       if pnode in node_info:
989         node_info[pnode]['pinst'].append(instance)
990       else:
991         feedback_fn("  - ERROR: instance %s, connection to primary node"
992                     " %s failed" % (instance, pnode))
993         bad = True
994
995       # If the instance is non-redundant we cannot survive losing its primary
996       # node, so we are not N+1 compliant. On the other hand we have no disk
997       # templates with more than one secondary so that situation is not well
998       # supported either.
999       # FIXME: does not support file-backed instances
1000       if len(inst_config.secondary_nodes) == 0:
1001         i_non_redundant.append(instance)
1002       elif len(inst_config.secondary_nodes) > 1:
1003         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1004                     % instance)
1005
1006       if not inst_config.auto_balance:
1007         i_non_a_balanced.append(instance)
1008
1009       for snode in inst_config.secondary_nodes:
1010         if snode in node_info:
1011           node_info[snode]['sinst'].append(instance)
1012           if pnode not in node_info[snode]['sinst-by-pnode']:
1013             node_info[snode]['sinst-by-pnode'][pnode] = []
1014           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1015         else:
1016           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1017                       " %s failed" % (instance, snode))
1018
1019     feedback_fn("* Verifying orphan volumes")
1020     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1021                                        feedback_fn)
1022     bad = bad or result
1023
1024     feedback_fn("* Verifying remaining instances")
1025     result = self._VerifyOrphanInstances(instancelist, node_instance,
1026                                          feedback_fn)
1027     bad = bad or result
1028
1029     if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1030         not incomplete_nodeinfo):
1031       feedback_fn("* Verifying N+1 Memory redundancy")
1032       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1033       bad = bad or result
1034
1035     feedback_fn("* Other Notes")
1036     if i_non_redundant:
1037       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1038                   % len(i_non_redundant))
1039
1040     if i_non_a_balanced:
1041       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1042                   % len(i_non_a_balanced))
1043
1044     return int(bad)
1045
1046   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1047     """Analize the post-hooks' result, handle it, and send some
1048     nicely-formatted feedback back to the user.
1049
1050     Args:
1051       phase: the hooks phase that has just been run
1052       hooks_results: the results of the multi-node hooks rpc call
1053       feedback_fn: function to send feedback back to the caller
1054       lu_result: previous Exec result
1055
1056     """
1057     # We only really run POST phase hooks, and are only interested in their results
1058     if phase == constants.HOOKS_PHASE_POST:
1059       # Used to change hooks' output to proper indentation
1060       indent_re = re.compile('^', re.M)
1061       feedback_fn("* Hooks Results")
1062       if not hooks_results:
1063         feedback_fn("  - ERROR: general communication failure")
1064         lu_result = 1
1065       else:
1066         for node_name in hooks_results:
1067           show_node_header = True
1068           res = hooks_results[node_name]
1069           if res is False or not isinstance(res, list):
1070             feedback_fn("    Communication failure")
1071             lu_result = 1
1072             continue
1073           for script, hkr, output in res:
1074             if hkr == constants.HKR_FAIL:
1075               # The node header is only shown once, if there are
1076               # failing hooks on that node
1077               if show_node_header:
1078                 feedback_fn("  Node %s:" % node_name)
1079                 show_node_header = False
1080               feedback_fn("    ERROR: Script %s failed, output:" % script)
1081               output = indent_re.sub('      ', output)
1082               feedback_fn("%s" % output)
1083               lu_result = 1
1084
1085       return lu_result
1086
1087
1088 class LUVerifyDisks(NoHooksLU):
1089   """Verifies the cluster disks status.
1090
1091   """
1092   _OP_REQP = []
1093
1094   def CheckPrereq(self):
1095     """Check prerequisites.
1096
1097     This has no prerequisites.
1098
1099     """
1100     pass
1101
1102   def Exec(self, feedback_fn):
1103     """Verify integrity of cluster disks.
1104
1105     """
1106     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1107
1108     vg_name = self.cfg.GetVGName()
1109     nodes = utils.NiceSort(self.cfg.GetNodeList())
1110     instances = [self.cfg.GetInstanceInfo(name)
1111                  for name in self.cfg.GetInstanceList()]
1112
1113     nv_dict = {}
1114     for inst in instances:
1115       inst_lvs = {}
1116       if (inst.status != "up" or
1117           inst.disk_template not in constants.DTS_NET_MIRROR):
1118         continue
1119       inst.MapLVsByNode(inst_lvs)
1120       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1121       for node, vol_list in inst_lvs.iteritems():
1122         for vol in vol_list:
1123           nv_dict[(node, vol)] = inst
1124
1125     if not nv_dict:
1126       return result
1127
1128     node_lvs = rpc.call_volume_list(nodes, vg_name)
1129
1130     to_act = set()
1131     for node in nodes:
1132       # node_volume
1133       lvs = node_lvs[node]
1134
1135       if isinstance(lvs, basestring):
1136         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1137         res_nlvm[node] = lvs
1138       elif not isinstance(lvs, dict):
1139         logger.Info("connection to node %s failed or invalid data returned" %
1140                     (node,))
1141         res_nodes.append(node)
1142         continue
1143
1144       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1145         inst = nv_dict.pop((node, lv_name), None)
1146         if (not lv_online and inst is not None
1147             and inst.name not in res_instances):
1148           res_instances.append(inst.name)
1149
1150     # any leftover items in nv_dict are missing LVs, let's arrange the
1151     # data better
1152     for key, inst in nv_dict.iteritems():
1153       if inst.name not in res_missing:
1154         res_missing[inst.name] = []
1155       res_missing[inst.name].append(key)
1156
1157     return result
1158
1159
1160 class LURenameCluster(LogicalUnit):
1161   """Rename the cluster.
1162
1163   """
1164   HPATH = "cluster-rename"
1165   HTYPE = constants.HTYPE_CLUSTER
1166   _OP_REQP = ["name"]
1167
1168   def BuildHooksEnv(self):
1169     """Build hooks env.
1170
1171     """
1172     env = {
1173       "OP_TARGET": self.sstore.GetClusterName(),
1174       "NEW_NAME": self.op.name,
1175       }
1176     mn = self.sstore.GetMasterNode()
1177     return env, [mn], [mn]
1178
1179   def CheckPrereq(self):
1180     """Verify that the passed name is a valid one.
1181
1182     """
1183     hostname = utils.HostInfo(self.op.name)
1184
1185     new_name = hostname.name
1186     self.ip = new_ip = hostname.ip
1187     old_name = self.sstore.GetClusterName()
1188     old_ip = self.sstore.GetMasterIP()
1189     if new_name == old_name and new_ip == old_ip:
1190       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1191                                  " cluster has changed")
1192     if new_ip != old_ip:
1193       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1194         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1195                                    " reachable on the network. Aborting." %
1196                                    new_ip)
1197
1198     self.op.name = new_name
1199
1200   def Exec(self, feedback_fn):
1201     """Rename the cluster.
1202
1203     """
1204     clustername = self.op.name
1205     ip = self.ip
1206     ss = self.sstore
1207
1208     # shutdown the master IP
1209     master = ss.GetMasterNode()
1210     if not rpc.call_node_stop_master(master):
1211       raise errors.OpExecError("Could not disable the master role")
1212
1213     try:
1214       # modify the sstore
1215       ss.SetKey(ss.SS_MASTER_IP, ip)
1216       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1217
1218       # Distribute updated ss config to all nodes
1219       myself = self.cfg.GetNodeInfo(master)
1220       dist_nodes = self.cfg.GetNodeList()
1221       if myself.name in dist_nodes:
1222         dist_nodes.remove(myself.name)
1223
1224       logger.Debug("Copying updated ssconf data to all nodes")
1225       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1226         fname = ss.KeyToFilename(keyname)
1227         result = rpc.call_upload_file(dist_nodes, fname)
1228         for to_node in dist_nodes:
1229           if not result[to_node]:
1230             logger.Error("copy of file %s to node %s failed" %
1231                          (fname, to_node))
1232     finally:
1233       if not rpc.call_node_start_master(master):
1234         logger.Error("Could not re-enable the master role on the master,"
1235                      " please restart manually.")
1236
1237
1238 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1239   """Sleep and poll for an instance's disk to sync.
1240
1241   """
1242   if not instance.disks:
1243     return True
1244
1245   if not oneshot:
1246     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1247
1248   node = instance.primary_node
1249
1250   for dev in instance.disks:
1251     cfgw.SetDiskID(dev, node)
1252
1253   retries = 0
1254   while True:
1255     max_time = 0
1256     done = True
1257     cumul_degraded = False
1258     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1259     if not rstats:
1260       proc.LogWarning("Can't get any data from node %s" % node)
1261       retries += 1
1262       if retries >= 10:
1263         raise errors.RemoteError("Can't contact node %s for mirror data,"
1264                                  " aborting." % node)
1265       time.sleep(6)
1266       continue
1267     retries = 0
1268     for i in range(len(rstats)):
1269       mstat = rstats[i]
1270       if mstat is None:
1271         proc.LogWarning("Can't compute data for node %s/%s" %
1272                         (node, instance.disks[i].iv_name))
1273         continue
1274       # we ignore the ldisk parameter
1275       perc_done, est_time, is_degraded, _ = mstat
1276       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1277       if perc_done is not None:
1278         done = False
1279         if est_time is not None:
1280           rem_time = "%d estimated seconds remaining" % est_time
1281           max_time = est_time
1282         else:
1283           rem_time = "no time estimate"
1284         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1285                      (instance.disks[i].iv_name, perc_done, rem_time))
1286     if done or oneshot:
1287       break
1288
1289     if unlock:
1290       utils.Unlock('cmd')
1291     try:
1292       time.sleep(min(60, max_time))
1293     finally:
1294       if unlock:
1295         utils.Lock('cmd')
1296
1297   if done:
1298     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1299   return not cumul_degraded
1300
1301
1302 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1303   """Check that mirrors are not degraded.
1304
1305   The ldisk parameter, if True, will change the test from the
1306   is_degraded attribute (which represents overall non-ok status for
1307   the device(s)) to the ldisk (representing the local storage status).
1308
1309   """
1310   cfgw.SetDiskID(dev, node)
1311   if ldisk:
1312     idx = 6
1313   else:
1314     idx = 5
1315
1316   result = True
1317   if on_primary or dev.AssembleOnSecondary():
1318     rstats = rpc.call_blockdev_find(node, dev)
1319     if not rstats:
1320       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1321       result = False
1322     else:
1323       result = result and (not rstats[idx])
1324   if dev.children:
1325     for child in dev.children:
1326       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1327
1328   return result
1329
1330
1331 class LUDiagnoseOS(NoHooksLU):
1332   """Logical unit for OS diagnose/query.
1333
1334   """
1335   _OP_REQP = ["output_fields", "names"]
1336
1337   def CheckPrereq(self):
1338     """Check prerequisites.
1339
1340     This always succeeds, since this is a pure query LU.
1341
1342     """
1343     if self.op.names:
1344       raise errors.OpPrereqError("Selective OS query not supported")
1345
1346     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1347     _CheckOutputFields(static=[],
1348                        dynamic=self.dynamic_fields,
1349                        selected=self.op.output_fields)
1350
1351   @staticmethod
1352   def _DiagnoseByOS(node_list, rlist):
1353     """Remaps a per-node return list into an a per-os per-node dictionary
1354
1355       Args:
1356         node_list: a list with the names of all nodes
1357         rlist: a map with node names as keys and OS objects as values
1358
1359       Returns:
1360         map: a map with osnames as keys and as value another map, with
1361              nodes as
1362              keys and list of OS objects as values
1363              e.g. {"debian-etch": {"node1": [<object>,...],
1364                                    "node2": [<object>,]}
1365                   }
1366
1367     """
1368     all_os = {}
1369     for node_name, nr in rlist.iteritems():
1370       if not nr:
1371         continue
1372       for os in nr:
1373         if os.name not in all_os:
1374           # build a list of nodes for this os containing empty lists
1375           # for each node in node_list
1376           all_os[os.name] = {}
1377           for nname in node_list:
1378             all_os[os.name][nname] = []
1379         all_os[os.name][node_name].append(os)
1380     return all_os
1381
1382   def Exec(self, feedback_fn):
1383     """Compute the list of OSes.
1384
1385     """
1386     node_list = self.cfg.GetNodeList()
1387     node_data = rpc.call_os_diagnose(node_list)
1388     if node_data == False:
1389       raise errors.OpExecError("Can't gather the list of OSes")
1390     pol = self._DiagnoseByOS(node_list, node_data)
1391     output = []
1392     for os_name, os_data in pol.iteritems():
1393       row = []
1394       for field in self.op.output_fields:
1395         if field == "name":
1396           val = os_name
1397         elif field == "valid":
1398           val = utils.all([osl and osl[0] for osl in os_data.values()])
1399         elif field == "node_status":
1400           val = {}
1401           for node_name, nos_list in os_data.iteritems():
1402             val[node_name] = [(v.status, v.path) for v in nos_list]
1403         else:
1404           raise errors.ParameterError(field)
1405         row.append(val)
1406       output.append(row)
1407
1408     return output
1409
1410
1411 class LURemoveNode(LogicalUnit):
1412   """Logical unit for removing a node.
1413
1414   """
1415   HPATH = "node-remove"
1416   HTYPE = constants.HTYPE_NODE
1417   _OP_REQP = ["node_name"]
1418
1419   def BuildHooksEnv(self):
1420     """Build hooks env.
1421
1422     This doesn't run on the target node in the pre phase as a failed
1423     node would not allows itself to run.
1424
1425     """
1426     env = {
1427       "OP_TARGET": self.op.node_name,
1428       "NODE_NAME": self.op.node_name,
1429       }
1430     all_nodes = self.cfg.GetNodeList()
1431     all_nodes.remove(self.op.node_name)
1432     return env, all_nodes, all_nodes
1433
1434   def CheckPrereq(self):
1435     """Check prerequisites.
1436
1437     This checks:
1438      - the node exists in the configuration
1439      - it does not have primary or secondary instances
1440      - it's not the master
1441
1442     Any errors are signalled by raising errors.OpPrereqError.
1443
1444     """
1445     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1446     if node is None:
1447       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1448
1449     instance_list = self.cfg.GetInstanceList()
1450
1451     masternode = self.sstore.GetMasterNode()
1452     if node.name == masternode:
1453       raise errors.OpPrereqError("Node is the master node,"
1454                                  " you need to failover first.")
1455
1456     for instance_name in instance_list:
1457       instance = self.cfg.GetInstanceInfo(instance_name)
1458       if node.name == instance.primary_node:
1459         raise errors.OpPrereqError("Instance %s still running on the node,"
1460                                    " please remove first." % instance_name)
1461       if node.name in instance.secondary_nodes:
1462         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1463                                    " please remove first." % instance_name)
1464     self.op.node_name = node.name
1465     self.node = node
1466
1467   def Exec(self, feedback_fn):
1468     """Removes the node from the cluster.
1469
1470     """
1471     node = self.node
1472     logger.Info("stopping the node daemon and removing configs from node %s" %
1473                 node.name)
1474
1475     rpc.call_node_leave_cluster(node.name)
1476
1477     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1478
1479     logger.Info("Removing node %s from config" % node.name)
1480
1481     self.cfg.RemoveNode(node.name)
1482
1483     _RemoveHostFromEtcHosts(node.name)
1484
1485
1486 class LUQueryNodes(NoHooksLU):
1487   """Logical unit for querying nodes.
1488
1489   """
1490   _OP_REQP = ["output_fields", "names"]
1491
1492   def CheckPrereq(self):
1493     """Check prerequisites.
1494
1495     This checks that the fields required are valid output fields.
1496
1497     """
1498     self.dynamic_fields = frozenset([
1499       "dtotal", "dfree",
1500       "mtotal", "mnode", "mfree",
1501       "bootid",
1502       "ctotal",
1503       ])
1504
1505     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1506                                "pinst_list", "sinst_list",
1507                                "pip", "sip", "tags"],
1508                        dynamic=self.dynamic_fields,
1509                        selected=self.op.output_fields)
1510
1511     self.wanted = _GetWantedNodes(self, self.op.names)
1512
1513   def Exec(self, feedback_fn):
1514     """Computes the list of nodes and their attributes.
1515
1516     """
1517     nodenames = self.wanted
1518     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1519
1520     # begin data gathering
1521
1522     if self.dynamic_fields.intersection(self.op.output_fields):
1523       live_data = {}
1524       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1525       for name in nodenames:
1526         nodeinfo = node_data.get(name, None)
1527         if nodeinfo:
1528           live_data[name] = {
1529             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1530             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1531             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1532             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1533             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1534             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1535             "bootid": nodeinfo['bootid'],
1536             }
1537         else:
1538           live_data[name] = {}
1539     else:
1540       live_data = dict.fromkeys(nodenames, {})
1541
1542     node_to_primary = dict([(name, set()) for name in nodenames])
1543     node_to_secondary = dict([(name, set()) for name in nodenames])
1544
1545     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1546                              "sinst_cnt", "sinst_list"))
1547     if inst_fields & frozenset(self.op.output_fields):
1548       instancelist = self.cfg.GetInstanceList()
1549
1550       for instance_name in instancelist:
1551         inst = self.cfg.GetInstanceInfo(instance_name)
1552         if inst.primary_node in node_to_primary:
1553           node_to_primary[inst.primary_node].add(inst.name)
1554         for secnode in inst.secondary_nodes:
1555           if secnode in node_to_secondary:
1556             node_to_secondary[secnode].add(inst.name)
1557
1558     # end data gathering
1559
1560     output = []
1561     for node in nodelist:
1562       node_output = []
1563       for field in self.op.output_fields:
1564         if field == "name":
1565           val = node.name
1566         elif field == "pinst_list":
1567           val = list(node_to_primary[node.name])
1568         elif field == "sinst_list":
1569           val = list(node_to_secondary[node.name])
1570         elif field == "pinst_cnt":
1571           val = len(node_to_primary[node.name])
1572         elif field == "sinst_cnt":
1573           val = len(node_to_secondary[node.name])
1574         elif field == "pip":
1575           val = node.primary_ip
1576         elif field == "sip":
1577           val = node.secondary_ip
1578         elif field == "tags":
1579           val = list(node.GetTags())
1580         elif field in self.dynamic_fields:
1581           val = live_data[node.name].get(field, None)
1582         else:
1583           raise errors.ParameterError(field)
1584         node_output.append(val)
1585       output.append(node_output)
1586
1587     return output
1588
1589
1590 class LUQueryNodeVolumes(NoHooksLU):
1591   """Logical unit for getting volumes on node(s).
1592
1593   """
1594   _OP_REQP = ["nodes", "output_fields"]
1595
1596   def CheckPrereq(self):
1597     """Check prerequisites.
1598
1599     This checks that the fields required are valid output fields.
1600
1601     """
1602     self.nodes = _GetWantedNodes(self, self.op.nodes)
1603
1604     _CheckOutputFields(static=["node"],
1605                        dynamic=["phys", "vg", "name", "size", "instance"],
1606                        selected=self.op.output_fields)
1607
1608
1609   def Exec(self, feedback_fn):
1610     """Computes the list of nodes and their attributes.
1611
1612     """
1613     nodenames = self.nodes
1614     volumes = rpc.call_node_volumes(nodenames)
1615
1616     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1617              in self.cfg.GetInstanceList()]
1618
1619     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1620
1621     output = []
1622     for node in nodenames:
1623       if node not in volumes or not volumes[node]:
1624         continue
1625
1626       node_vols = volumes[node][:]
1627       node_vols.sort(key=lambda vol: vol['dev'])
1628
1629       for vol in node_vols:
1630         node_output = []
1631         for field in self.op.output_fields:
1632           if field == "node":
1633             val = node
1634           elif field == "phys":
1635             val = vol['dev']
1636           elif field == "vg":
1637             val = vol['vg']
1638           elif field == "name":
1639             val = vol['name']
1640           elif field == "size":
1641             val = int(float(vol['size']))
1642           elif field == "instance":
1643             for inst in ilist:
1644               if node not in lv_by_node[inst]:
1645                 continue
1646               if vol['name'] in lv_by_node[inst][node]:
1647                 val = inst.name
1648                 break
1649             else:
1650               val = '-'
1651           else:
1652             raise errors.ParameterError(field)
1653           node_output.append(str(val))
1654
1655         output.append(node_output)
1656
1657     return output
1658
1659
1660 class LUAddNode(LogicalUnit):
1661   """Logical unit for adding node to the cluster.
1662
1663   """
1664   HPATH = "node-add"
1665   HTYPE = constants.HTYPE_NODE
1666   _OP_REQP = ["node_name"]
1667
1668   def BuildHooksEnv(self):
1669     """Build hooks env.
1670
1671     This will run on all nodes before, and on all nodes + the new node after.
1672
1673     """
1674     env = {
1675       "OP_TARGET": self.op.node_name,
1676       "NODE_NAME": self.op.node_name,
1677       "NODE_PIP": self.op.primary_ip,
1678       "NODE_SIP": self.op.secondary_ip,
1679       }
1680     nodes_0 = self.cfg.GetNodeList()
1681     nodes_1 = nodes_0 + [self.op.node_name, ]
1682     return env, nodes_0, nodes_1
1683
1684   def CheckPrereq(self):
1685     """Check prerequisites.
1686
1687     This checks:
1688      - the new node is not already in the config
1689      - it is resolvable
1690      - its parameters (single/dual homed) matches the cluster
1691
1692     Any errors are signalled by raising errors.OpPrereqError.
1693
1694     """
1695     node_name = self.op.node_name
1696     cfg = self.cfg
1697
1698     dns_data = utils.HostInfo(node_name)
1699
1700     node = dns_data.name
1701     primary_ip = self.op.primary_ip = dns_data.ip
1702     secondary_ip = getattr(self.op, "secondary_ip", None)
1703     if secondary_ip is None:
1704       secondary_ip = primary_ip
1705     if not utils.IsValidIP(secondary_ip):
1706       raise errors.OpPrereqError("Invalid secondary IP given")
1707     self.op.secondary_ip = secondary_ip
1708
1709     node_list = cfg.GetNodeList()
1710     if not self.op.readd and node in node_list:
1711       raise errors.OpPrereqError("Node %s is already in the configuration" %
1712                                  node)
1713     elif self.op.readd and node not in node_list:
1714       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1715
1716     for existing_node_name in node_list:
1717       existing_node = cfg.GetNodeInfo(existing_node_name)
1718
1719       if self.op.readd and node == existing_node_name:
1720         if (existing_node.primary_ip != primary_ip or
1721             existing_node.secondary_ip != secondary_ip):
1722           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1723                                      " address configuration as before")
1724         continue
1725
1726       if (existing_node.primary_ip == primary_ip or
1727           existing_node.secondary_ip == primary_ip or
1728           existing_node.primary_ip == secondary_ip or
1729           existing_node.secondary_ip == secondary_ip):
1730         raise errors.OpPrereqError("New node ip address(es) conflict with"
1731                                    " existing node %s" % existing_node.name)
1732
1733     # check that the type of the node (single versus dual homed) is the
1734     # same as for the master
1735     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1736     master_singlehomed = myself.secondary_ip == myself.primary_ip
1737     newbie_singlehomed = secondary_ip == primary_ip
1738     if master_singlehomed != newbie_singlehomed:
1739       if master_singlehomed:
1740         raise errors.OpPrereqError("The master has no private ip but the"
1741                                    " new node has one")
1742       else:
1743         raise errors.OpPrereqError("The master has a private ip but the"
1744                                    " new node doesn't have one")
1745
1746     # checks reachablity
1747     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1748       raise errors.OpPrereqError("Node not reachable by ping")
1749
1750     if not newbie_singlehomed:
1751       # check reachability from my secondary ip to newbie's secondary ip
1752       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1753                            source=myself.secondary_ip):
1754         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1755                                    " based ping to noded port")
1756
1757     self.new_node = objects.Node(name=node,
1758                                  primary_ip=primary_ip,
1759                                  secondary_ip=secondary_ip)
1760
1761     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1762       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1763         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1764                                    constants.VNC_PASSWORD_FILE)
1765
1766   def Exec(self, feedback_fn):
1767     """Adds the new node to the cluster.
1768
1769     """
1770     new_node = self.new_node
1771     node = new_node.name
1772
1773     # set up inter-node password and certificate and restarts the node daemon
1774     gntpass = self.sstore.GetNodeDaemonPassword()
1775     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1776       raise errors.OpExecError("ganeti password corruption detected")
1777     f = open(constants.SSL_CERT_FILE)
1778     try:
1779       gntpem = f.read(8192)
1780     finally:
1781       f.close()
1782     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1783     # so we use this to detect an invalid certificate; as long as the
1784     # cert doesn't contain this, the here-document will be correctly
1785     # parsed by the shell sequence below
1786     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1787       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1788     if not gntpem.endswith("\n"):
1789       raise errors.OpExecError("PEM must end with newline")
1790     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1791
1792     # and then connect with ssh to set password and start ganeti-noded
1793     # note that all the below variables are sanitized at this point,
1794     # either by being constants or by the checks above
1795     ss = self.sstore
1796     mycommand = ("umask 077 && "
1797                  "echo '%s' > '%s' && "
1798                  "cat > '%s' << '!EOF.' && \n"
1799                  "%s!EOF.\n%s restart" %
1800                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1801                   constants.SSL_CERT_FILE, gntpem,
1802                   constants.NODE_INITD_SCRIPT))
1803
1804     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1805     if result.failed:
1806       raise errors.OpExecError("Remote command on node %s, error: %s,"
1807                                " output: %s" %
1808                                (node, result.fail_reason, result.output))
1809
1810     # check connectivity
1811     time.sleep(4)
1812
1813     result = rpc.call_version([node])[node]
1814     if result:
1815       if constants.PROTOCOL_VERSION == result:
1816         logger.Info("communication to node %s fine, sw version %s match" %
1817                     (node, result))
1818       else:
1819         raise errors.OpExecError("Version mismatch master version %s,"
1820                                  " node version %s" %
1821                                  (constants.PROTOCOL_VERSION, result))
1822     else:
1823       raise errors.OpExecError("Cannot get version from the new node")
1824
1825     # setup ssh on node
1826     logger.Info("copy ssh key to node %s" % node)
1827     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1828     keyarray = []
1829     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1830                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1831                 priv_key, pub_key]
1832
1833     for i in keyfiles:
1834       f = open(i, 'r')
1835       try:
1836         keyarray.append(f.read())
1837       finally:
1838         f.close()
1839
1840     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1841                                keyarray[3], keyarray[4], keyarray[5])
1842
1843     if not result:
1844       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1845
1846     # Add node to our /etc/hosts, and add key to known_hosts
1847     _AddHostToEtcHosts(new_node.name)
1848
1849     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1850                       self.cfg.GetHostKey())
1851
1852     if new_node.secondary_ip != new_node.primary_ip:
1853       if not rpc.call_node_tcp_ping(new_node.name,
1854                                     constants.LOCALHOST_IP_ADDRESS,
1855                                     new_node.secondary_ip,
1856                                     constants.DEFAULT_NODED_PORT,
1857                                     10, False):
1858         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1859                                  " you gave (%s). Please fix and re-run this"
1860                                  " command." % new_node.secondary_ip)
1861
1862     success, msg = ssh.VerifyNodeHostname(node)
1863     if not success:
1864       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1865                                " than the one the resolver gives: %s."
1866                                " Please fix and re-run this command." %
1867                                (node, msg))
1868
1869     # Distribute updated /etc/hosts and known_hosts to all nodes,
1870     # including the node just added
1871     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1872     dist_nodes = self.cfg.GetNodeList()
1873     if not self.op.readd:
1874       dist_nodes.append(node)
1875     if myself.name in dist_nodes:
1876       dist_nodes.remove(myself.name)
1877
1878     logger.Debug("Copying hosts and known_hosts to all nodes")
1879     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1880       result = rpc.call_upload_file(dist_nodes, fname)
1881       for to_node in dist_nodes:
1882         if not result[to_node]:
1883           logger.Error("copy of file %s to node %s failed" %
1884                        (fname, to_node))
1885
1886     to_copy = ss.GetFileList()
1887     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1888       to_copy.append(constants.VNC_PASSWORD_FILE)
1889     for fname in to_copy:
1890       if not ssh.CopyFileToNode(node, fname):
1891         logger.Error("could not copy file %s to node %s" % (fname, node))
1892
1893     if not self.op.readd:
1894       logger.Info("adding node %s to cluster.conf" % node)
1895       self.cfg.AddNode(new_node)
1896
1897
1898 class LUMasterFailover(LogicalUnit):
1899   """Failover the master node to the current node.
1900
1901   This is a special LU in that it must run on a non-master node.
1902
1903   """
1904   HPATH = "master-failover"
1905   HTYPE = constants.HTYPE_CLUSTER
1906   REQ_MASTER = False
1907   _OP_REQP = []
1908
1909   def BuildHooksEnv(self):
1910     """Build hooks env.
1911
1912     This will run on the new master only in the pre phase, and on all
1913     the nodes in the post phase.
1914
1915     """
1916     env = {
1917       "OP_TARGET": self.new_master,
1918       "NEW_MASTER": self.new_master,
1919       "OLD_MASTER": self.old_master,
1920       }
1921     return env, [self.new_master], self.cfg.GetNodeList()
1922
1923   def CheckPrereq(self):
1924     """Check prerequisites.
1925
1926     This checks that we are not already the master.
1927
1928     """
1929     self.new_master = utils.HostInfo().name
1930     self.old_master = self.sstore.GetMasterNode()
1931
1932     if self.old_master == self.new_master:
1933       raise errors.OpPrereqError("This commands must be run on the node"
1934                                  " where you want the new master to be."
1935                                  " %s is already the master" %
1936                                  self.old_master)
1937
1938   def Exec(self, feedback_fn):
1939     """Failover the master node.
1940
1941     This command, when run on a non-master node, will cause the current
1942     master to cease being master, and the non-master to become new
1943     master.
1944
1945     """
1946     #TODO: do not rely on gethostname returning the FQDN
1947     logger.Info("setting master to %s, old master: %s" %
1948                 (self.new_master, self.old_master))
1949
1950     if not rpc.call_node_stop_master(self.old_master):
1951       logger.Error("could disable the master role on the old master"
1952                    " %s, please disable manually" % self.old_master)
1953
1954     ss = self.sstore
1955     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1956     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1957                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1958       logger.Error("could not distribute the new simple store master file"
1959                    " to the other nodes, please check.")
1960
1961     if not rpc.call_node_start_master(self.new_master):
1962       logger.Error("could not start the master role on the new master"
1963                    " %s, please check" % self.new_master)
1964       feedback_fn("Error in activating the master IP on the new master,"
1965                   " please fix manually.")
1966
1967
1968
1969 class LUQueryClusterInfo(NoHooksLU):
1970   """Query cluster configuration.
1971
1972   """
1973   _OP_REQP = []
1974   REQ_MASTER = False
1975
1976   def CheckPrereq(self):
1977     """No prerequsites needed for this LU.
1978
1979     """
1980     pass
1981
1982   def Exec(self, feedback_fn):
1983     """Return cluster config.
1984
1985     """
1986     result = {
1987       "name": self.sstore.GetClusterName(),
1988       "software_version": constants.RELEASE_VERSION,
1989       "protocol_version": constants.PROTOCOL_VERSION,
1990       "config_version": constants.CONFIG_VERSION,
1991       "os_api_version": constants.OS_API_VERSION,
1992       "export_version": constants.EXPORT_VERSION,
1993       "master": self.sstore.GetMasterNode(),
1994       "architecture": (platform.architecture()[0], platform.machine()),
1995       "hypervisor_type": self.sstore.GetHypervisorType(),
1996       }
1997
1998     return result
1999
2000
2001 class LUClusterCopyFile(NoHooksLU):
2002   """Copy file to cluster.
2003
2004   """
2005   _OP_REQP = ["nodes", "filename"]
2006
2007   def CheckPrereq(self):
2008     """Check prerequisites.
2009
2010     It should check that the named file exists and that the given list
2011     of nodes is valid.
2012
2013     """
2014     if not os.path.exists(self.op.filename):
2015       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2016
2017     self.nodes = _GetWantedNodes(self, self.op.nodes)
2018
2019   def Exec(self, feedback_fn):
2020     """Copy a file from master to some nodes.
2021
2022     Args:
2023       opts - class with options as members
2024       args - list containing a single element, the file name
2025     Opts used:
2026       nodes - list containing the name of target nodes; if empty, all nodes
2027
2028     """
2029     filename = self.op.filename
2030
2031     myname = utils.HostInfo().name
2032
2033     for node in self.nodes:
2034       if node == myname:
2035         continue
2036       if not ssh.CopyFileToNode(node, filename):
2037         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2038
2039
2040 class LUDumpClusterConfig(NoHooksLU):
2041   """Return a text-representation of the cluster-config.
2042
2043   """
2044   _OP_REQP = []
2045
2046   def CheckPrereq(self):
2047     """No prerequisites.
2048
2049     """
2050     pass
2051
2052   def Exec(self, feedback_fn):
2053     """Dump a representation of the cluster config to the standard output.
2054
2055     """
2056     return self.cfg.DumpConfig()
2057
2058
2059 class LURunClusterCommand(NoHooksLU):
2060   """Run a command on some nodes.
2061
2062   """
2063   _OP_REQP = ["command", "nodes"]
2064
2065   def CheckPrereq(self):
2066     """Check prerequisites.
2067
2068     It checks that the given list of nodes is valid.
2069
2070     """
2071     self.nodes = _GetWantedNodes(self, self.op.nodes)
2072
2073   def Exec(self, feedback_fn):
2074     """Run a command on some nodes.
2075
2076     """
2077     # put the master at the end of the nodes list
2078     master_node = self.sstore.GetMasterNode()
2079     if master_node in self.nodes:
2080       self.nodes.remove(master_node)
2081       self.nodes.append(master_node)
2082
2083     data = []
2084     for node in self.nodes:
2085       result = ssh.SSHCall(node, "root", self.op.command)
2086       data.append((node, result.output, result.exit_code))
2087
2088     return data
2089
2090
2091 class LUActivateInstanceDisks(NoHooksLU):
2092   """Bring up an instance's disks.
2093
2094   """
2095   _OP_REQP = ["instance_name"]
2096
2097   def CheckPrereq(self):
2098     """Check prerequisites.
2099
2100     This checks that the instance is in the cluster.
2101
2102     """
2103     instance = self.cfg.GetInstanceInfo(
2104       self.cfg.ExpandInstanceName(self.op.instance_name))
2105     if instance is None:
2106       raise errors.OpPrereqError("Instance '%s' not known" %
2107                                  self.op.instance_name)
2108     self.instance = instance
2109
2110
2111   def Exec(self, feedback_fn):
2112     """Activate the disks.
2113
2114     """
2115     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2116     if not disks_ok:
2117       raise errors.OpExecError("Cannot activate block devices")
2118
2119     return disks_info
2120
2121
2122 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2123   """Prepare the block devices for an instance.
2124
2125   This sets up the block devices on all nodes.
2126
2127   Args:
2128     instance: a ganeti.objects.Instance object
2129     ignore_secondaries: if true, errors on secondary nodes won't result
2130                         in an error return from the function
2131
2132   Returns:
2133     false if the operation failed
2134     list of (host, instance_visible_name, node_visible_name) if the operation
2135          suceeded with the mapping from node devices to instance devices
2136   """
2137   device_info = []
2138   disks_ok = True
2139   iname = instance.name
2140   # With the two passes mechanism we try to reduce the window of
2141   # opportunity for the race condition of switching DRBD to primary
2142   # before handshaking occured, but we do not eliminate it
2143
2144   # The proper fix would be to wait (with some limits) until the
2145   # connection has been made and drbd transitions from WFConnection
2146   # into any other network-connected state (Connected, SyncTarget,
2147   # SyncSource, etc.)
2148
2149   # 1st pass, assemble on all nodes in secondary mode
2150   for inst_disk in instance.disks:
2151     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2152       cfg.SetDiskID(node_disk, node)
2153       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2154       if not result:
2155         logger.Error("could not prepare block device %s on node %s"
2156                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2157         if not ignore_secondaries:
2158           disks_ok = False
2159
2160   # FIXME: race condition on drbd migration to primary
2161
2162   # 2nd pass, do only the primary node
2163   for inst_disk in instance.disks:
2164     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2165       if node != instance.primary_node:
2166         continue
2167       cfg.SetDiskID(node_disk, node)
2168       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2169       if not result:
2170         logger.Error("could not prepare block device %s on node %s"
2171                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2172         disks_ok = False
2173     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2174
2175   # leave the disks configured for the primary node
2176   # this is a workaround that would be fixed better by
2177   # improving the logical/physical id handling
2178   for disk in instance.disks:
2179     cfg.SetDiskID(disk, instance.primary_node)
2180
2181   return disks_ok, device_info
2182
2183
2184 def _StartInstanceDisks(cfg, instance, force):
2185   """Start the disks of an instance.
2186
2187   """
2188   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2189                                            ignore_secondaries=force)
2190   if not disks_ok:
2191     _ShutdownInstanceDisks(instance, cfg)
2192     if force is not None and not force:
2193       logger.Error("If the message above refers to a secondary node,"
2194                    " you can retry the operation using '--force'.")
2195     raise errors.OpExecError("Disk consistency error")
2196
2197
2198 class LUDeactivateInstanceDisks(NoHooksLU):
2199   """Shutdown an instance's disks.
2200
2201   """
2202   _OP_REQP = ["instance_name"]
2203
2204   def CheckPrereq(self):
2205     """Check prerequisites.
2206
2207     This checks that the instance is in the cluster.
2208
2209     """
2210     instance = self.cfg.GetInstanceInfo(
2211       self.cfg.ExpandInstanceName(self.op.instance_name))
2212     if instance is None:
2213       raise errors.OpPrereqError("Instance '%s' not known" %
2214                                  self.op.instance_name)
2215     self.instance = instance
2216
2217   def Exec(self, feedback_fn):
2218     """Deactivate the disks
2219
2220     """
2221     instance = self.instance
2222     ins_l = rpc.call_instance_list([instance.primary_node])
2223     ins_l = ins_l[instance.primary_node]
2224     if not type(ins_l) is list:
2225       raise errors.OpExecError("Can't contact node '%s'" %
2226                                instance.primary_node)
2227
2228     if self.instance.name in ins_l:
2229       raise errors.OpExecError("Instance is running, can't shutdown"
2230                                " block devices.")
2231
2232     _ShutdownInstanceDisks(instance, self.cfg)
2233
2234
2235 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2236   """Shutdown block devices of an instance.
2237
2238   This does the shutdown on all nodes of the instance.
2239
2240   If the ignore_primary is false, errors on the primary node are
2241   ignored.
2242
2243   """
2244   result = True
2245   for disk in instance.disks:
2246     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2247       cfg.SetDiskID(top_disk, node)
2248       if not rpc.call_blockdev_shutdown(node, top_disk):
2249         logger.Error("could not shutdown block device %s on node %s" %
2250                      (disk.iv_name, node))
2251         if not ignore_primary or node != instance.primary_node:
2252           result = False
2253   return result
2254
2255
2256 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2257   """Checks if a node has enough free memory.
2258
2259   This function check if a given node has the needed amount of free
2260   memory. In case the node has less memory or we cannot get the
2261   information from the node, this function raise an OpPrereqError
2262   exception.
2263
2264   Args:
2265     - cfg: a ConfigWriter instance
2266     - node: the node name
2267     - reason: string to use in the error message
2268     - requested: the amount of memory in MiB
2269
2270   """
2271   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2272   if not (nodeinfo and isinstance(nodeinfo, dict) and
2273           node in nodeinfo and isinstance(nodeinfo[node], dict)):
2274     raise errors.OpPrereqError("Could not contact node %s for resource"
2275                              " information" % (node,))
2276
2277   free_mem = nodeinfo[node].get('memory_free')
2278   if not isinstance(free_mem, int):
2279     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2280                              " was '%s'" % (node, free_mem))
2281   if requested > free_mem:
2282     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2283                              " needed %s MiB, available %s MiB" %
2284                              (node, reason, requested, free_mem))
2285
2286
2287 class LUStartupInstance(LogicalUnit):
2288   """Starts an instance.
2289
2290   """
2291   HPATH = "instance-start"
2292   HTYPE = constants.HTYPE_INSTANCE
2293   _OP_REQP = ["instance_name", "force"]
2294
2295   def BuildHooksEnv(self):
2296     """Build hooks env.
2297
2298     This runs on master, primary and secondary nodes of the instance.
2299
2300     """
2301     env = {
2302       "FORCE": self.op.force,
2303       }
2304     env.update(_BuildInstanceHookEnvByObject(self.instance))
2305     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2306           list(self.instance.secondary_nodes))
2307     return env, nl, nl
2308
2309   def CheckPrereq(self):
2310     """Check prerequisites.
2311
2312     This checks that the instance is in the cluster.
2313
2314     """
2315     instance = self.cfg.GetInstanceInfo(
2316       self.cfg.ExpandInstanceName(self.op.instance_name))
2317     if instance is None:
2318       raise errors.OpPrereqError("Instance '%s' not known" %
2319                                  self.op.instance_name)
2320
2321     # check bridges existance
2322     _CheckInstanceBridgesExist(instance)
2323
2324     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2325                          "starting instance %s" % instance.name,
2326                          instance.memory)
2327
2328     self.instance = instance
2329     self.op.instance_name = instance.name
2330
2331   def Exec(self, feedback_fn):
2332     """Start the instance.
2333
2334     """
2335     instance = self.instance
2336     force = self.op.force
2337     extra_args = getattr(self.op, "extra_args", "")
2338
2339     self.cfg.MarkInstanceUp(instance.name)
2340
2341     node_current = instance.primary_node
2342
2343     _StartInstanceDisks(self.cfg, instance, force)
2344
2345     if not rpc.call_instance_start(node_current, instance, extra_args):
2346       _ShutdownInstanceDisks(instance, self.cfg)
2347       raise errors.OpExecError("Could not start instance")
2348
2349
2350 class LURebootInstance(LogicalUnit):
2351   """Reboot an instance.
2352
2353   """
2354   HPATH = "instance-reboot"
2355   HTYPE = constants.HTYPE_INSTANCE
2356   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2357
2358   def BuildHooksEnv(self):
2359     """Build hooks env.
2360
2361     This runs on master, primary and secondary nodes of the instance.
2362
2363     """
2364     env = {
2365       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2366       }
2367     env.update(_BuildInstanceHookEnvByObject(self.instance))
2368     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2369           list(self.instance.secondary_nodes))
2370     return env, nl, nl
2371
2372   def CheckPrereq(self):
2373     """Check prerequisites.
2374
2375     This checks that the instance is in the cluster.
2376
2377     """
2378     instance = self.cfg.GetInstanceInfo(
2379       self.cfg.ExpandInstanceName(self.op.instance_name))
2380     if instance is None:
2381       raise errors.OpPrereqError("Instance '%s' not known" %
2382                                  self.op.instance_name)
2383
2384     # check bridges existance
2385     _CheckInstanceBridgesExist(instance)
2386
2387     self.instance = instance
2388     self.op.instance_name = instance.name
2389
2390   def Exec(self, feedback_fn):
2391     """Reboot the instance.
2392
2393     """
2394     instance = self.instance
2395     ignore_secondaries = self.op.ignore_secondaries
2396     reboot_type = self.op.reboot_type
2397     extra_args = getattr(self.op, "extra_args", "")
2398
2399     node_current = instance.primary_node
2400
2401     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2402                            constants.INSTANCE_REBOOT_HARD,
2403                            constants.INSTANCE_REBOOT_FULL]:
2404       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2405                                   (constants.INSTANCE_REBOOT_SOFT,
2406                                    constants.INSTANCE_REBOOT_HARD,
2407                                    constants.INSTANCE_REBOOT_FULL))
2408
2409     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2410                        constants.INSTANCE_REBOOT_HARD]:
2411       if not rpc.call_instance_reboot(node_current, instance,
2412                                       reboot_type, extra_args):
2413         raise errors.OpExecError("Could not reboot instance")
2414     else:
2415       if not rpc.call_instance_shutdown(node_current, instance):
2416         raise errors.OpExecError("could not shutdown instance for full reboot")
2417       _ShutdownInstanceDisks(instance, self.cfg)
2418       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2419       if not rpc.call_instance_start(node_current, instance, extra_args):
2420         _ShutdownInstanceDisks(instance, self.cfg)
2421         raise errors.OpExecError("Could not start instance for full reboot")
2422
2423     self.cfg.MarkInstanceUp(instance.name)
2424
2425
2426 class LUShutdownInstance(LogicalUnit):
2427   """Shutdown an instance.
2428
2429   """
2430   HPATH = "instance-stop"
2431   HTYPE = constants.HTYPE_INSTANCE
2432   _OP_REQP = ["instance_name"]
2433
2434   def BuildHooksEnv(self):
2435     """Build hooks env.
2436
2437     This runs on master, primary and secondary nodes of the instance.
2438
2439     """
2440     env = _BuildInstanceHookEnvByObject(self.instance)
2441     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2442           list(self.instance.secondary_nodes))
2443     return env, nl, nl
2444
2445   def CheckPrereq(self):
2446     """Check prerequisites.
2447
2448     This checks that the instance is in the cluster.
2449
2450     """
2451     instance = self.cfg.GetInstanceInfo(
2452       self.cfg.ExpandInstanceName(self.op.instance_name))
2453     if instance is None:
2454       raise errors.OpPrereqError("Instance '%s' not known" %
2455                                  self.op.instance_name)
2456     self.instance = instance
2457
2458   def Exec(self, feedback_fn):
2459     """Shutdown the instance.
2460
2461     """
2462     instance = self.instance
2463     node_current = instance.primary_node
2464     self.cfg.MarkInstanceDown(instance.name)
2465     if not rpc.call_instance_shutdown(node_current, instance):
2466       logger.Error("could not shutdown instance")
2467
2468     _ShutdownInstanceDisks(instance, self.cfg)
2469
2470
2471 class LUReinstallInstance(LogicalUnit):
2472   """Reinstall an instance.
2473
2474   """
2475   HPATH = "instance-reinstall"
2476   HTYPE = constants.HTYPE_INSTANCE
2477   _OP_REQP = ["instance_name"]
2478
2479   def BuildHooksEnv(self):
2480     """Build hooks env.
2481
2482     This runs on master, primary and secondary nodes of the instance.
2483
2484     """
2485     env = _BuildInstanceHookEnvByObject(self.instance)
2486     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2487           list(self.instance.secondary_nodes))
2488     return env, nl, nl
2489
2490   def CheckPrereq(self):
2491     """Check prerequisites.
2492
2493     This checks that the instance is in the cluster and is not running.
2494
2495     """
2496     instance = self.cfg.GetInstanceInfo(
2497       self.cfg.ExpandInstanceName(self.op.instance_name))
2498     if instance is None:
2499       raise errors.OpPrereqError("Instance '%s' not known" %
2500                                  self.op.instance_name)
2501     if instance.disk_template == constants.DT_DISKLESS:
2502       raise errors.OpPrereqError("Instance '%s' has no disks" %
2503                                  self.op.instance_name)
2504     if instance.status != "down":
2505       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2506                                  self.op.instance_name)
2507     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2508     if remote_info:
2509       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2510                                  (self.op.instance_name,
2511                                   instance.primary_node))
2512
2513     self.op.os_type = getattr(self.op, "os_type", None)
2514     if self.op.os_type is not None:
2515       # OS verification
2516       pnode = self.cfg.GetNodeInfo(
2517         self.cfg.ExpandNodeName(instance.primary_node))
2518       if pnode is None:
2519         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2520                                    self.op.pnode)
2521       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2522       if not os_obj:
2523         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2524                                    " primary node"  % self.op.os_type)
2525
2526     self.instance = instance
2527
2528   def Exec(self, feedback_fn):
2529     """Reinstall the instance.
2530
2531     """
2532     inst = self.instance
2533
2534     if self.op.os_type is not None:
2535       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2536       inst.os = self.op.os_type
2537       self.cfg.AddInstance(inst)
2538
2539     _StartInstanceDisks(self.cfg, inst, None)
2540     try:
2541       feedback_fn("Running the instance OS create scripts...")
2542       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2543         raise errors.OpExecError("Could not install OS for instance %s"
2544                                  " on node %s" %
2545                                  (inst.name, inst.primary_node))
2546     finally:
2547       _ShutdownInstanceDisks(inst, self.cfg)
2548
2549
2550 class LURenameInstance(LogicalUnit):
2551   """Rename an instance.
2552
2553   """
2554   HPATH = "instance-rename"
2555   HTYPE = constants.HTYPE_INSTANCE
2556   _OP_REQP = ["instance_name", "new_name"]
2557
2558   def BuildHooksEnv(self):
2559     """Build hooks env.
2560
2561     This runs on master, primary and secondary nodes of the instance.
2562
2563     """
2564     env = _BuildInstanceHookEnvByObject(self.instance)
2565     env["INSTANCE_NEW_NAME"] = self.op.new_name
2566     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2567           list(self.instance.secondary_nodes))
2568     return env, nl, nl
2569
2570   def CheckPrereq(self):
2571     """Check prerequisites.
2572
2573     This checks that the instance is in the cluster and is not running.
2574
2575     """
2576     instance = self.cfg.GetInstanceInfo(
2577       self.cfg.ExpandInstanceName(self.op.instance_name))
2578     if instance is None:
2579       raise errors.OpPrereqError("Instance '%s' not known" %
2580                                  self.op.instance_name)
2581     if instance.status != "down":
2582       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2583                                  self.op.instance_name)
2584     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2585     if remote_info:
2586       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2587                                  (self.op.instance_name,
2588                                   instance.primary_node))
2589     self.instance = instance
2590
2591     # new name verification
2592     name_info = utils.HostInfo(self.op.new_name)
2593
2594     self.op.new_name = new_name = name_info.name
2595     instance_list = self.cfg.GetInstanceList()
2596     if new_name in instance_list:
2597       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2598                                  new_name)
2599
2600     if not getattr(self.op, "ignore_ip", False):
2601       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2602         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2603                                    (name_info.ip, new_name))
2604
2605
2606   def Exec(self, feedback_fn):
2607     """Reinstall the instance.
2608
2609     """
2610     inst = self.instance
2611     old_name = inst.name
2612
2613     self.cfg.RenameInstance(inst.name, self.op.new_name)
2614
2615     # re-read the instance from the configuration after rename
2616     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2617
2618     _StartInstanceDisks(self.cfg, inst, None)
2619     try:
2620       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2621                                           "sda", "sdb"):
2622         msg = ("Could not run OS rename script for instance %s on node %s"
2623                " (but the instance has been renamed in Ganeti)" %
2624                (inst.name, inst.primary_node))
2625         logger.Error(msg)
2626     finally:
2627       _ShutdownInstanceDisks(inst, self.cfg)
2628
2629
2630 class LURemoveInstance(LogicalUnit):
2631   """Remove an instance.
2632
2633   """
2634   HPATH = "instance-remove"
2635   HTYPE = constants.HTYPE_INSTANCE
2636   _OP_REQP = ["instance_name", "ignore_failures"]
2637
2638   def BuildHooksEnv(self):
2639     """Build hooks env.
2640
2641     This runs on master, primary and secondary nodes of the instance.
2642
2643     """
2644     env = _BuildInstanceHookEnvByObject(self.instance)
2645     nl = [self.sstore.GetMasterNode()]
2646     return env, nl, nl
2647
2648   def CheckPrereq(self):
2649     """Check prerequisites.
2650
2651     This checks that the instance is in the cluster.
2652
2653     """
2654     instance = self.cfg.GetInstanceInfo(
2655       self.cfg.ExpandInstanceName(self.op.instance_name))
2656     if instance is None:
2657       raise errors.OpPrereqError("Instance '%s' not known" %
2658                                  self.op.instance_name)
2659     self.instance = instance
2660
2661   def Exec(self, feedback_fn):
2662     """Remove the instance.
2663
2664     """
2665     instance = self.instance
2666     logger.Info("shutting down instance %s on node %s" %
2667                 (instance.name, instance.primary_node))
2668
2669     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2670       if self.op.ignore_failures:
2671         feedback_fn("Warning: can't shutdown instance")
2672       else:
2673         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2674                                  (instance.name, instance.primary_node))
2675
2676     logger.Info("removing block devices for instance %s" % instance.name)
2677
2678     if not _RemoveDisks(instance, self.cfg):
2679       if self.op.ignore_failures:
2680         feedback_fn("Warning: can't remove instance's disks")
2681       else:
2682         raise errors.OpExecError("Can't remove instance's disks")
2683
2684     logger.Info("removing instance %s out of cluster config" % instance.name)
2685
2686     self.cfg.RemoveInstance(instance.name)
2687
2688
2689 class LUQueryInstances(NoHooksLU):
2690   """Logical unit for querying instances.
2691
2692   """
2693   _OP_REQP = ["output_fields", "names"]
2694
2695   def CheckPrereq(self):
2696     """Check prerequisites.
2697
2698     This checks that the fields required are valid output fields.
2699
2700     """
2701     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2702     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2703                                "admin_state", "admin_ram",
2704                                "disk_template", "ip", "mac", "bridge",
2705                                "sda_size", "sdb_size", "vcpus", "tags",
2706                                "auto_balance",
2707                                "network_port", "kernel_path", "initrd_path",
2708                                "hvm_boot_order", "hvm_acpi", "hvm_pae",
2709                                "hvm_cdrom_image_path", "hvm_nic_type",
2710                                "hvm_disk_type", "vnc_bind_address"],
2711                        dynamic=self.dynamic_fields,
2712                        selected=self.op.output_fields)
2713
2714
2715     self.wanted = _GetWantedInstances(self, self.op.names)
2716
2717   def Exec(self, feedback_fn):
2718     """Computes the list of nodes and their attributes.
2719
2720     """
2721     instance_names = self.wanted
2722     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2723                      in instance_names]
2724
2725     # begin data gathering
2726
2727     nodes = frozenset([inst.primary_node for inst in instance_list])
2728
2729     bad_nodes = []
2730     if self.dynamic_fields.intersection(self.op.output_fields):
2731       live_data = {}
2732       node_data = rpc.call_all_instances_info(nodes)
2733       for name in nodes:
2734         result = node_data[name]
2735         if result:
2736           live_data.update(result)
2737         elif result == False:
2738           bad_nodes.append(name)
2739         # else no instance is alive
2740     else:
2741       live_data = dict([(name, {}) for name in instance_names])
2742
2743     # end data gathering
2744
2745     output = []
2746     for instance in instance_list:
2747       iout = []
2748       for field in self.op.output_fields:
2749         if field == "name":
2750           val = instance.name
2751         elif field == "os":
2752           val = instance.os
2753         elif field == "pnode":
2754           val = instance.primary_node
2755         elif field == "snodes":
2756           val = list(instance.secondary_nodes)
2757         elif field == "admin_state":
2758           val = (instance.status != "down")
2759         elif field == "oper_state":
2760           if instance.primary_node in bad_nodes:
2761             val = None
2762           else:
2763             val = bool(live_data.get(instance.name))
2764         elif field == "status":
2765           if instance.primary_node in bad_nodes:
2766             val = "ERROR_nodedown"
2767           else:
2768             running = bool(live_data.get(instance.name))
2769             if running:
2770               if instance.status != "down":
2771                 val = "running"
2772               else:
2773                 val = "ERROR_up"
2774             else:
2775               if instance.status != "down":
2776                 val = "ERROR_down"
2777               else:
2778                 val = "ADMIN_down"
2779         elif field == "admin_ram":
2780           val = instance.memory
2781         elif field == "oper_ram":
2782           if instance.primary_node in bad_nodes:
2783             val = None
2784           elif instance.name in live_data:
2785             val = live_data[instance.name].get("memory", "?")
2786           else:
2787             val = "-"
2788         elif field == "disk_template":
2789           val = instance.disk_template
2790         elif field == "ip":
2791           val = instance.nics[0].ip
2792         elif field == "bridge":
2793           val = instance.nics[0].bridge
2794         elif field == "mac":
2795           val = instance.nics[0].mac
2796         elif field == "sda_size" or field == "sdb_size":
2797           disk = instance.FindDisk(field[:3])
2798           if disk is None:
2799             val = None
2800           else:
2801             val = disk.size
2802         elif field == "vcpus":
2803           val = instance.vcpus
2804         elif field == "tags":
2805           val = list(instance.GetTags())
2806         elif field == "auto_balance":
2807           val = instance.auto_balance
2808         elif field in ("network_port", "kernel_path", "initrd_path",
2809                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2810                        "hvm_cdrom_image_path", "hvm_nic_type",
2811                        "hvm_disk_type", "vnc_bind_address"):
2812           val = getattr(instance, field, None)
2813           if val is not None:
2814             pass
2815           elif field in ("hvm_nic_type", "hvm_disk_type",
2816                          "kernel_path", "initrd_path"):
2817             val = "default"
2818           else:
2819             val = "-"
2820         else:
2821           raise errors.ParameterError(field)
2822         iout.append(val)
2823       output.append(iout)
2824
2825     return output
2826
2827
2828 class LUFailoverInstance(LogicalUnit):
2829   """Failover an instance.
2830
2831   """
2832   HPATH = "instance-failover"
2833   HTYPE = constants.HTYPE_INSTANCE
2834   _OP_REQP = ["instance_name", "ignore_consistency"]
2835
2836   def BuildHooksEnv(self):
2837     """Build hooks env.
2838
2839     This runs on master, primary and secondary nodes of the instance.
2840
2841     """
2842     env = {
2843       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2844       }
2845     env.update(_BuildInstanceHookEnvByObject(self.instance))
2846     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2847     return env, nl, nl
2848
2849   def CheckPrereq(self):
2850     """Check prerequisites.
2851
2852     This checks that the instance is in the cluster.
2853
2854     """
2855     instance = self.cfg.GetInstanceInfo(
2856       self.cfg.ExpandInstanceName(self.op.instance_name))
2857     if instance is None:
2858       raise errors.OpPrereqError("Instance '%s' not known" %
2859                                  self.op.instance_name)
2860
2861     if instance.disk_template not in constants.DTS_NET_MIRROR:
2862       raise errors.OpPrereqError("Instance's disk layout is not"
2863                                  " network mirrored, cannot failover.")
2864
2865     secondary_nodes = instance.secondary_nodes
2866     if not secondary_nodes:
2867       raise errors.ProgrammerError("no secondary node but using "
2868                                    "DT_REMOTE_RAID1 template")
2869
2870     target_node = secondary_nodes[0]
2871     # check memory requirements on the secondary node
2872     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2873                          instance.name, instance.memory)
2874
2875     # check bridge existance
2876     brlist = [nic.bridge for nic in instance.nics]
2877     if not rpc.call_bridges_exist(target_node, brlist):
2878       raise errors.OpPrereqError("One or more target bridges %s does not"
2879                                  " exist on destination node '%s'" %
2880                                  (brlist, target_node))
2881
2882     self.instance = instance
2883
2884   def Exec(self, feedback_fn):
2885     """Failover an instance.
2886
2887     The failover is done by shutting it down on its present node and
2888     starting it on the secondary.
2889
2890     """
2891     instance = self.instance
2892
2893     source_node = instance.primary_node
2894     target_node = instance.secondary_nodes[0]
2895
2896     feedback_fn("* checking disk consistency between source and target")
2897     for dev in instance.disks:
2898       # for remote_raid1, these are md over drbd
2899       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2900         if instance.status == "up" and not self.op.ignore_consistency:
2901           raise errors.OpExecError("Disk %s is degraded on target node,"
2902                                    " aborting failover." % dev.iv_name)
2903
2904     feedback_fn("* shutting down instance on source node")
2905     logger.Info("Shutting down instance %s on node %s" %
2906                 (instance.name, source_node))
2907
2908     if not rpc.call_instance_shutdown(source_node, instance):
2909       if self.op.ignore_consistency:
2910         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2911                      " anyway. Please make sure node %s is down"  %
2912                      (instance.name, source_node, source_node))
2913       else:
2914         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2915                                  (instance.name, source_node))
2916
2917     feedback_fn("* deactivating the instance's disks on source node")
2918     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2919       raise errors.OpExecError("Can't shut down the instance's disks.")
2920
2921     instance.primary_node = target_node
2922     # distribute new instance config to the other nodes
2923     self.cfg.Update(instance)
2924
2925     # Only start the instance if it's marked as up
2926     if instance.status == "up":
2927       feedback_fn("* activating the instance's disks on target node")
2928       logger.Info("Starting instance %s on node %s" %
2929                   (instance.name, target_node))
2930
2931       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2932                                                ignore_secondaries=True)
2933       if not disks_ok:
2934         _ShutdownInstanceDisks(instance, self.cfg)
2935         raise errors.OpExecError("Can't activate the instance's disks")
2936
2937       feedback_fn("* starting the instance on the target node")
2938       if not rpc.call_instance_start(target_node, instance, None):
2939         _ShutdownInstanceDisks(instance, self.cfg)
2940         raise errors.OpExecError("Could not start instance %s on node %s." %
2941                                  (instance.name, target_node))
2942
2943
2944 class LUMigrateInstance(LogicalUnit):
2945   """Migrate an instance.
2946
2947   This is migration without shutting down, compared to the failover,
2948   which is done with shutdown.
2949
2950   """
2951   HPATH = "instance-migrate"
2952   HTYPE = constants.HTYPE_INSTANCE
2953   _OP_REQP = ["instance_name", "live", "cleanup"]
2954
2955   def BuildHooksEnv(self):
2956     """Build hooks env.
2957
2958     This runs on master, primary and secondary nodes of the instance.
2959
2960     """
2961     env = _BuildInstanceHookEnvByObject(self.instance)
2962     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2963     return env, nl, nl
2964
2965   def CheckPrereq(self):
2966     """Check prerequisites.
2967
2968     This checks that the instance is in the cluster.
2969
2970     """
2971     instance = self.cfg.GetInstanceInfo(
2972       self.cfg.ExpandInstanceName(self.op.instance_name))
2973     if instance is None:
2974       raise errors.OpPrereqError("Instance '%s' not known" %
2975                                  self.op.instance_name)
2976
2977     if instance.disk_template != constants.DT_DRBD8:
2978       raise errors.OpPrereqError("Instance's disk layout is not"
2979                                  " drbd8, cannot migrate.")
2980
2981     secondary_nodes = instance.secondary_nodes
2982     if not secondary_nodes:
2983       raise errors.ProgrammerError("no secondary node but using "
2984                                    "drbd8 disk template")
2985
2986     target_node = secondary_nodes[0]
2987     # check memory requirements on the secondary node
2988     _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2989                          instance.name, instance.memory)
2990
2991     # check bridge existance
2992     brlist = [nic.bridge for nic in instance.nics]
2993     if not rpc.call_bridges_exist(target_node, brlist):
2994       raise errors.OpPrereqError("One or more target bridges %s does not"
2995                                  " exist on destination node '%s'" %
2996                                  (brlist, target_node))
2997
2998     if not self.op.cleanup:
2999       migratable = rpc.call_instance_migratable(instance.primary_node,
3000                                                 instance)
3001       if not migratable:
3002         raise errors.OpPrereqError("Can't contact node '%s'" %
3003                                    instance.primary_node)
3004       if not migratable[0]:
3005         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3006                                    migratable[1])
3007
3008     self.instance = instance
3009
3010   def _WaitUntilSync(self):
3011     """Poll with custom rpc for disk sync.
3012
3013     This uses our own step-based rpc call.
3014
3015     """
3016     self.feedback_fn("* wait until resync is done")
3017     all_done = False
3018     while not all_done:
3019       all_done = True
3020       result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3021                                           self.instance.disks,
3022                                           self.nodes_ip, False,
3023                                           constants.DRBD_RECONF_RPC_WFSYNC)
3024       min_percent = 100
3025       for node in self.all_nodes:
3026         if not result[node] or not result[node][0]:
3027           raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
3028         node_done, node_percent = result[node][1]
3029         all_done = all_done and node_done
3030         if node_percent is not None:
3031           min_percent = min(min_percent, node_percent)
3032       if not all_done:
3033         if min_percent < 100:
3034           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3035         time.sleep(2)
3036
3037   def _EnsureSecondary(self, node):
3038     """Demote a node to secondary.
3039
3040     """
3041     self.feedback_fn("* switching node %s to secondary mode" % node)
3042     result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3043                                         self.instance.disks,
3044                                         self.nodes_ip, False,
3045                                         constants.DRBD_RECONF_RPC_SECONDARY)
3046     if not result[node] or not result[node][0]:
3047         raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3048                                  " error %s" %
3049                                  (node, result[node][1]))
3050
3051   def _GoStandalone(self):
3052     """Disconnect from the network.
3053
3054     """
3055     self.feedback_fn("* changing into standalone mode")
3056     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3057                                         self.instance.disks,
3058                                         self.nodes_ip, True,
3059                                         constants.DRBD_RECONF_RPC_DISCONNECT)
3060     for node in self.all_nodes:
3061       if not result[node] or not result[node][0]:
3062         raise errors.OpExecError("Cannot disconnect disks node %s,"
3063                                  " error %s" % (node, result[node][1]))
3064
3065   def _GoReconnect(self, multimaster):
3066     """Reconnect to the network.
3067
3068     """
3069     if multimaster:
3070       msg = "dual-master"
3071     else:
3072       msg = "single-master"
3073     self.feedback_fn("* changing disks into %s mode" % msg)
3074     result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3075                                         self.instance.disks,
3076                                         self.nodes_ip,
3077                                         multimaster,
3078                                         constants.DRBD_RECONF_RPC_RECONNECT)
3079     for node in self.all_nodes:
3080       if not result[node] or not result[node][0]:
3081         raise errors.OpExecError("Cannot change disks config on node %s,"
3082                                  " error %s" % (node, result[node][1]))
3083
3084   def _IdentifyDisks(self):
3085     """Start the migration RPC sequence.
3086
3087     """
3088     self.feedback_fn("* identifying disks")
3089     result = rpc.call_drbd_reconfig_net(self.all_nodes,
3090                                         self.instance.name,
3091                                         self.instance.disks,
3092                                         self.nodes_ip, True,
3093                                         constants.DRBD_RECONF_RPC_INIT)
3094     for node in self.all_nodes:
3095       if not result[node] or not result[node][0]:
3096         raise errors.OpExecError("Cannot identify disks node %s,"
3097                                  " error %s" % (node, result[node][1]))
3098
3099   def _ExecCleanup(self):
3100     """Try to cleanup after a failed migration.
3101
3102     The cleanup is done by:
3103       - check that the instance is running only on one node
3104         (and update the config if needed)
3105       - change disks on its secondary node to secondary
3106       - wait until disks are fully synchronized
3107       - disconnect from the network
3108       - change disks into single-master mode
3109       - wait again until disks are fully synchronized
3110
3111     """
3112     instance = self.instance
3113     target_node = self.target_node
3114     source_node = self.source_node
3115
3116     # check running on only one node
3117     self.feedback_fn("* checking where the instance actually runs"
3118                      " (if this hangs, the hypervisor might be in"
3119                      " a bad state)")
3120     ins_l = rpc.call_instance_list(self.all_nodes)
3121     for node in self.all_nodes:
3122       if not type(ins_l[node]) is list:
3123         raise errors.OpExecError("Can't contact node '%s'" % node)
3124
3125     runningon_source = instance.name in ins_l[source_node]
3126     runningon_target = instance.name in ins_l[target_node]
3127
3128     if runningon_source and runningon_target:
3129       raise errors.OpExecError("Instance seems to be running on two nodes,"
3130                                " or the hypervisor is confused. You will have"
3131                                " to ensure manually that it runs only on one"
3132                                " and restart this operation.")
3133
3134     if not (runningon_source or runningon_target):
3135       raise errors.OpExecError("Instance does not seem to be running at all."
3136                                " In this case, it's safer to repair by"
3137                                " running 'gnt-instance stop' to ensure disk"
3138                                " shutdown, and then restarting it.")
3139
3140     if runningon_target:
3141       # the migration has actually succeeded, we need to update the config
3142       self.feedback_fn("* instance running on secondary node (%s),"
3143                        " updating config" % target_node)
3144       instance.primary_node = target_node
3145       self.cfg.Update(instance)
3146       demoted_node = source_node
3147     else:
3148       self.feedback_fn("* instance confirmed to be running on its"
3149                        " primary node (%s)" % source_node)
3150       demoted_node = target_node
3151
3152     self._IdentifyDisks()
3153
3154     self._EnsureSecondary(demoted_node)
3155     self._WaitUntilSync()
3156     self._GoStandalone()
3157     self._GoReconnect(False)
3158     self._WaitUntilSync()
3159
3160     self.feedback_fn("* done")
3161
3162   def _ExecMigration(self):
3163     """Migrate an instance.
3164
3165     The migrate is done by:
3166       - change the disks into dual-master mode
3167       - wait until disks are fully synchronized again
3168       - migrate the instance
3169       - change disks on the new secondary node (the old primary) to secondary
3170       - wait until disks are fully synchronized
3171       - change disks into single-master mode
3172
3173     """
3174     instance = self.instance
3175     target_node = self.target_node
3176     source_node = self.source_node
3177
3178     self.feedback_fn("* checking disk consistency between source and target")
3179     for dev in instance.disks:
3180       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3181         raise errors.OpExecError("Disk %s is degraded or not fully"
3182                                  " synchronized on target node,"
3183                                  " aborting migrate." % dev.iv_name)
3184
3185     self._IdentifyDisks()
3186
3187     self._EnsureSecondary(target_node)
3188     self._GoStandalone()
3189     self._GoReconnect(True)
3190     self._WaitUntilSync()
3191
3192     self.feedback_fn("* migrating instance to %s" % target_node)
3193     time.sleep(10)
3194     result = rpc.call_instance_migrate(source_node, instance,
3195                                        self.nodes_ip[target_node],
3196                                        self.op.live)
3197     if not result or not result[0]:
3198       logger.Error("Instance migration failed, trying to revert disk status")
3199       try:
3200         self._EnsureSecondary(target_node)
3201         self._GoStandalone()
3202         self._GoReconnect(False)
3203         self._WaitUntilSync()
3204       except errors.OpExecError, err:
3205         logger.Error("Can't reconnect the drives: error '%s'\n"
3206                      "Please look and recover the instance status" % str(err))
3207
3208       raise errors.OpExecError("Could not migrate instance %s: %s" %
3209                                (instance.name, result[1]))
3210     time.sleep(10)
3211
3212     instance.primary_node = target_node
3213     # distribute new instance config to the other nodes
3214     self.cfg.Update(instance)
3215
3216     self._EnsureSecondary(source_node)
3217     self._WaitUntilSync()
3218     self._GoStandalone()
3219     self._GoReconnect(False)
3220     self._WaitUntilSync()
3221
3222     self.feedback_fn("* done")
3223
3224   def Exec(self, feedback_fn):
3225     """Perform the migration.
3226
3227     """
3228     self.feedback_fn = feedback_fn
3229
3230     self.source_node = self.instance.primary_node
3231     self.target_node = self.instance.secondary_nodes[0]
3232     self.all_nodes = [self.source_node, self.target_node]
3233     self.nodes_ip = {
3234       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3235       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3236       }
3237     if self.op.cleanup:
3238       return self._ExecCleanup()
3239     else:
3240       return self._ExecMigration()
3241
3242
3243 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3244   """Create a tree of block devices on the primary node.
3245
3246   This always creates all devices.
3247
3248   """
3249   if device.children:
3250     for child in device.children:
3251       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3252         return False
3253
3254   cfg.SetDiskID(device, node)
3255   new_id = rpc.call_blockdev_create(node, device, device.size,
3256                                     instance.name, True, info)
3257   if not new_id:
3258     return False
3259   if device.physical_id is None:
3260     device.physical_id = new_id
3261   return True
3262
3263
3264 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3265   """Create a tree of block devices on a secondary node.
3266
3267   If this device type has to be created on secondaries, create it and
3268   all its children.
3269
3270   If not, just recurse to children keeping the same 'force' value.
3271
3272   """
3273   if device.CreateOnSecondary():
3274     force = True
3275   if device.children:
3276     for child in device.children:
3277       if not _CreateBlockDevOnSecondary(cfg, node, instance,
3278                                         child, force, info):
3279         return False
3280
3281   if not force:
3282     return True
3283   cfg.SetDiskID(device, node)
3284   new_id = rpc.call_blockdev_create(node, device, device.size,
3285                                     instance.name, False, info)
3286   if not new_id:
3287     return False
3288   if device.physical_id is None:
3289     device.physical_id = new_id
3290   return True
3291
3292
3293 def _GenerateUniqueNames(cfg, exts):
3294   """Generate a suitable LV name.
3295
3296   This will generate a logical volume name for the given instance.
3297
3298   """
3299   results = []
3300   for val in exts:
3301     new_id = cfg.GenerateUniqueID()
3302     results.append("%s%s" % (new_id, val))
3303   return results
3304
3305
3306 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3307   """Generate a drbd device complete with its children.
3308
3309   """
3310   port = cfg.AllocatePort()
3311   vgname = cfg.GetVGName()
3312   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3313                           logical_id=(vgname, names[0]))
3314   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3315                           logical_id=(vgname, names[1]))
3316   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3317                           logical_id = (primary, secondary, port),
3318                           children = [dev_data, dev_meta])
3319   return drbd_dev
3320
3321
3322 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3323   """Generate a drbd8 device complete with its children.
3324
3325   """
3326   port = cfg.AllocatePort()
3327   vgname = cfg.GetVGName()
3328   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3329                           logical_id=(vgname, names[0]))
3330   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3331                           logical_id=(vgname, names[1]))
3332   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3333                           logical_id = (primary, secondary, port),
3334                           children = [dev_data, dev_meta],
3335                           iv_name=iv_name)
3336   return drbd_dev
3337
3338 def _GenerateDiskTemplate(cfg, template_name,
3339                           instance_name, primary_node,
3340                           secondary_nodes, disk_sz, swap_sz):
3341   """Generate the entire disk layout for a given template type.
3342
3343   """
3344   #TODO: compute space requirements
3345
3346   vgname = cfg.GetVGName()
3347   if template_name == constants.DT_DISKLESS:
3348     disks = []
3349   elif template_name == constants.DT_PLAIN:
3350     if len(secondary_nodes) != 0:
3351       raise errors.ProgrammerError("Wrong template configuration")
3352
3353     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3354     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3355                            logical_id=(vgname, names[0]),
3356                            iv_name = "sda")
3357     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3358                            logical_id=(vgname, names[1]),
3359                            iv_name = "sdb")
3360     disks = [sda_dev, sdb_dev]
3361   elif template_name == constants.DT_LOCAL_RAID1:
3362     if len(secondary_nodes) != 0:
3363       raise errors.ProgrammerError("Wrong template configuration")
3364
3365
3366     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3367                                        ".sdb_m1", ".sdb_m2"])
3368     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3369                               logical_id=(vgname, names[0]))
3370     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3371                               logical_id=(vgname, names[1]))
3372     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3373                               size=disk_sz,
3374                               children = [sda_dev_m1, sda_dev_m2])
3375     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3376                               logical_id=(vgname, names[2]))
3377     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3378                               logical_id=(vgname, names[3]))
3379     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3380                               size=swap_sz,
3381                               children = [sdb_dev_m1, sdb_dev_m2])
3382     disks = [md_sda_dev, md_sdb_dev]
3383   elif template_name == constants.DT_REMOTE_RAID1:
3384     if len(secondary_nodes) != 1:
3385       raise errors.ProgrammerError("Wrong template configuration")
3386     remote_node = secondary_nodes[0]
3387     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3388                                        ".sdb_data", ".sdb_meta"])
3389     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3390                                          disk_sz, names[0:2])
3391     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3392                               children = [drbd_sda_dev], size=disk_sz)
3393     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3394                                          swap_sz, names[2:4])
3395     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3396                               children = [drbd_sdb_dev], size=swap_sz)
3397     disks = [md_sda_dev, md_sdb_dev]
3398   elif template_name == constants.DT_DRBD8:
3399     if len(secondary_nodes) != 1:
3400       raise errors.ProgrammerError("Wrong template configuration")
3401     remote_node = secondary_nodes[0]
3402     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3403                                        ".sdb_data", ".sdb_meta"])
3404     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3405                                          disk_sz, names[0:2], "sda")
3406     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3407                                          swap_sz, names[2:4], "sdb")
3408     disks = [drbd_sda_dev, drbd_sdb_dev]
3409   else:
3410     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3411   return disks
3412
3413
3414 def _GetInstanceInfoText(instance):
3415   """Compute that text that should be added to the disk's metadata.
3416
3417   """
3418   return "originstname+%s" % instance.name
3419
3420
3421 def _CreateDisks(cfg, instance):
3422   """Create all disks for an instance.
3423
3424   This abstracts away some work from AddInstance.
3425
3426   Args:
3427     instance: the instance object
3428
3429   Returns:
3430     True or False showing the success of the creation process
3431
3432   """
3433   info = _GetInstanceInfoText(instance)
3434
3435   for device in instance.disks:
3436     logger.Info("creating volume %s for instance %s" %
3437               (device.iv_name, instance.name))
3438     #HARDCODE
3439     for secondary_node in instance.secondary_nodes:
3440       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3441                                         device, False, info):
3442         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3443                      (device.iv_name, device, secondary_node))
3444         return False
3445     #HARDCODE
3446     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3447                                     instance, device, info):
3448       logger.Error("failed to create volume %s on primary!" %
3449                    device.iv_name)
3450       return False
3451   return True
3452
3453
3454 def _RemoveDisks(instance, cfg):
3455   """Remove all disks for an instance.
3456
3457   This abstracts away some work from `AddInstance()` and
3458   `RemoveInstance()`. Note that in case some of the devices couldn't
3459   be removed, the removal will continue with the other ones (compare
3460   with `_CreateDisks()`).
3461
3462   Args:
3463     instance: the instance object
3464
3465   Returns:
3466     True or False showing the success of the removal proces
3467
3468   """
3469   logger.Info("removing block devices for instance %s" % instance.name)
3470
3471   result = True
3472   for device in instance.disks:
3473     for node, disk in device.ComputeNodeTree(instance.primary_node):
3474       cfg.SetDiskID(disk, node)
3475       if not rpc.call_blockdev_remove(node, disk):
3476         logger.Error("could not remove block device %s on node %s,"
3477                      " continuing anyway" %
3478                      (device.iv_name, node))
3479         result = False
3480   return result
3481
3482
3483 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3484   """Compute disk size requirements in the volume group
3485
3486   This is currently hard-coded for the two-drive layout.
3487
3488   """
3489   # Required free disk space as a function of disk and swap space
3490   req_size_dict = {
3491     constants.DT_DISKLESS: None,
3492     constants.DT_PLAIN: disk_size + swap_size,
3493     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3494     # 256 MB are added for drbd metadata, 128MB for each drbd device
3495     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3496     constants.DT_DRBD8: disk_size + swap_size + 256,
3497   }
3498
3499   if disk_template not in req_size_dict:
3500     raise errors.ProgrammerError("Disk template '%s' size requirement"
3501                                  " is unknown" %  disk_template)
3502
3503   return req_size_dict[disk_template]
3504
3505
3506 class LUCreateInstance(LogicalUnit):
3507   """Create an instance.
3508
3509   """
3510   HPATH = "instance-add"
3511   HTYPE = constants.HTYPE_INSTANCE
3512   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3513               "disk_template", "swap_size", "mode", "start", "vcpus",
3514               "wait_for_sync", "ip_check", "mac", "auto_balance"]
3515
3516   def _RunAllocator(self):
3517     """Run the allocator based on input opcode.
3518
3519     """
3520     disks = [{"size": self.op.disk_size, "mode": "w"},
3521              {"size": self.op.swap_size, "mode": "w"}]
3522     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3523              "bridge": self.op.bridge}]
3524     ial = IAllocator(self.cfg, self.sstore,
3525                      mode=constants.IALLOCATOR_MODE_ALLOC,
3526                      name=self.op.instance_name,
3527                      disk_template=self.op.disk_template,
3528                      tags=[],
3529                      os=self.op.os_type,
3530                      vcpus=self.op.vcpus,
3531                      mem_size=self.op.mem_size,
3532                      disks=disks,
3533                      nics=nics,
3534                      )
3535
3536     ial.Run(self.op.iallocator)
3537
3538     if not ial.success:
3539       raise errors.OpPrereqError("Can't compute nodes using"
3540                                  " iallocator '%s': %s" % (self.op.iallocator,
3541                                                            ial.info))
3542     if len(ial.nodes) != ial.required_nodes:
3543       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3544                                  " of nodes (%s), required %s" %
3545                                  (len(ial.nodes), ial.required_nodes))
3546     self.op.pnode = ial.nodes[0]
3547     logger.ToStdout("Selected nodes for the instance: %s" %
3548                     (", ".join(ial.nodes),))
3549     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3550                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3551     if ial.required_nodes == 2:
3552       self.op.snode = ial.nodes[1]
3553
3554   def BuildHooksEnv(self):
3555     """Build hooks env.
3556
3557     This runs on master, primary and secondary nodes of the instance.
3558
3559     """
3560     env = {
3561       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3562       "INSTANCE_DISK_SIZE": self.op.disk_size,
3563       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3564       "INSTANCE_ADD_MODE": self.op.mode,
3565       }
3566     if self.op.mode == constants.INSTANCE_IMPORT:
3567       env["INSTANCE_SRC_NODE"] = self.op.src_node
3568       env["INSTANCE_SRC_PATH"] = self.op.src_path
3569       env["INSTANCE_SRC_IMAGE"] = self.src_image
3570
3571     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3572       primary_node=self.op.pnode,
3573       secondary_nodes=self.secondaries,
3574       status=self.instance_status,
3575       os_type=self.op.os_type,
3576       memory=self.op.mem_size,
3577       vcpus=self.op.vcpus,
3578       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3579     ))
3580
3581     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3582           self.secondaries)
3583     return env, nl, nl
3584
3585
3586   def CheckPrereq(self):
3587     """Check prerequisites.
3588
3589     """
3590     # set optional parameters to none if they don't exist
3591     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3592                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3593                  "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3594       if not hasattr(self.op, attr):
3595         setattr(self.op, attr, None)
3596
3597     if self.op.mode not in (constants.INSTANCE_CREATE,
3598                             constants.INSTANCE_IMPORT):
3599       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3600                                  self.op.mode)
3601
3602     if self.op.mode == constants.INSTANCE_IMPORT:
3603       src_node = getattr(self.op, "src_node", None)
3604       src_path = getattr(self.op, "src_path", None)
3605       if src_node is None or src_path is None:
3606         raise errors.OpPrereqError("Importing an instance requires source"
3607                                    " node and path options")
3608       src_node_full = self.cfg.ExpandNodeName(src_node)
3609       if src_node_full is None:
3610         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3611       self.op.src_node = src_node = src_node_full
3612
3613       if not os.path.isabs(src_path):
3614         raise errors.OpPrereqError("The source path must be absolute")
3615
3616       export_info = rpc.call_export_info(src_node, src_path)
3617
3618       if not export_info:
3619         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3620
3621       if not export_info.has_section(constants.INISECT_EXP):
3622         raise errors.ProgrammerError("Corrupted export config")
3623
3624       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3625       if (int(ei_version) != constants.EXPORT_VERSION):
3626         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3627                                    (ei_version, constants.EXPORT_VERSION))
3628
3629       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3630         raise errors.OpPrereqError("Can't import instance with more than"
3631                                    " one data disk")
3632
3633       # FIXME: are the old os-es, disk sizes, etc. useful?
3634       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3635       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3636                                                          'disk0_dump'))
3637       self.src_image = diskimage
3638     else: # INSTANCE_CREATE
3639       if getattr(self.op, "os_type", None) is None:
3640         raise errors.OpPrereqError("No guest OS specified")
3641
3642     #### instance parameters check
3643
3644     # disk template and mirror node verification
3645     if self.op.disk_template not in constants.DISK_TEMPLATES:
3646       raise errors.OpPrereqError("Invalid disk template name")
3647
3648     # instance name verification
3649     hostname1 = utils.HostInfo(self.op.instance_name)
3650
3651     self.op.instance_name = instance_name = hostname1.name
3652     instance_list = self.cfg.GetInstanceList()
3653     if instance_name in instance_list:
3654       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3655                                  instance_name)
3656
3657     # ip validity checks
3658     ip = getattr(self.op, "ip", None)
3659     if ip is None or ip.lower() == "none":
3660       inst_ip = None
3661     elif ip.lower() == "auto":
3662       inst_ip = hostname1.ip
3663     else:
3664       if not utils.IsValidIP(ip):
3665         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3666                                    " like a valid IP" % ip)
3667       inst_ip = ip
3668     self.inst_ip = self.op.ip = inst_ip
3669
3670     if self.op.start and not self.op.ip_check:
3671       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3672                                  " adding an instance in start mode")
3673
3674     if self.op.ip_check:
3675       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3676         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3677                                    (hostname1.ip, instance_name))
3678
3679     # MAC address verification
3680     if self.op.mac != "auto":
3681       if not utils.IsValidMac(self.op.mac.lower()):
3682         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3683                                    self.op.mac)
3684
3685     # bridge verification
3686     bridge = getattr(self.op, "bridge", None)
3687     if bridge is None:
3688       self.op.bridge = self.cfg.GetDefBridge()
3689     else:
3690       self.op.bridge = bridge
3691
3692     # boot order verification
3693     if self.op.hvm_boot_order is not None:
3694       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3695         raise errors.OpPrereqError("invalid boot order specified,"
3696                                    " must be one or more of [acdn]")
3697     #### allocator run
3698
3699     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3700       raise errors.OpPrereqError("One and only one of iallocator and primary"
3701                                  " node must be given")
3702
3703     if self.op.iallocator is not None:
3704       self._RunAllocator()
3705
3706     #### node related checks
3707
3708     # check primary node
3709     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3710     if pnode is None:
3711       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3712                                  self.op.pnode)
3713     self.op.pnode = pnode.name
3714     self.pnode = pnode
3715     self.secondaries = []
3716
3717     # mirror node verification
3718     if self.op.disk_template in constants.DTS_NET_MIRROR:
3719       if getattr(self.op, "snode", None) is None:
3720         raise errors.OpPrereqError("The networked disk templates need"
3721                                    " a mirror node")
3722
3723       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3724       if snode_name is None:
3725         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3726                                    self.op.snode)
3727       elif snode_name == pnode.name:
3728         raise errors.OpPrereqError("The secondary node cannot be"
3729                                    " the primary node.")
3730       self.secondaries.append(snode_name)
3731
3732     req_size = _ComputeDiskSize(self.op.disk_template,
3733                                 self.op.disk_size, self.op.swap_size)
3734
3735     # Check lv size requirements
3736     if req_size is not None:
3737       nodenames = [pnode.name] + self.secondaries
3738       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3739       for node in nodenames:
3740         info = nodeinfo.get(node, None)
3741         if not info:
3742           raise errors.OpPrereqError("Cannot get current information"
3743                                      " from node '%s'" % node)
3744         vg_free = info.get('vg_free', None)
3745         if not isinstance(vg_free, int):
3746           raise errors.OpPrereqError("Can't compute free disk space on"
3747                                      " node %s" % node)
3748         if req_size > info['vg_free']:
3749           raise errors.OpPrereqError("Not enough disk space on target node %s."
3750                                      " %d MB available, %d MB required" %
3751                                      (node, info['vg_free'], req_size))
3752
3753     # os verification
3754     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3755     if not os_obj:
3756       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3757                                  " primary node"  % self.op.os_type)
3758
3759     if self.op.kernel_path == constants.VALUE_NONE:
3760       raise errors.OpPrereqError("Can't set instance kernel to none")
3761
3762
3763     # bridge check on primary node
3764     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3765       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3766                                  " destination node '%s'" %
3767                                  (self.op.bridge, pnode.name))
3768
3769     # memory check on primary node
3770     if self.op.start:
3771       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3772                            "creating instance %s" % self.op.instance_name,
3773                            self.op.mem_size)
3774
3775     # hvm_cdrom_image_path verification
3776     if self.op.hvm_cdrom_image_path is not None:
3777       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3778         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3779                                    " be an absolute path or None, not %s" %
3780                                    self.op.hvm_cdrom_image_path)
3781       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3782         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3783                                    " regular file or a symlink pointing to"
3784                                    " an existing regular file, not %s" %
3785                                    self.op.hvm_cdrom_image_path)
3786
3787     # vnc_bind_address verification
3788     if self.op.vnc_bind_address is not None:
3789       if not utils.IsValidIP(self.op.vnc_bind_address):
3790         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3791                                    " like a valid IP address" %
3792                                    self.op.vnc_bind_address)
3793
3794     # Xen HVM device type checks
3795     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3796       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3797         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3798                                    " hypervisor" % self.op.hvm_nic_type)
3799       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3800         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3801                                    " hypervisor" % self.op.hvm_disk_type)
3802
3803     if self.op.start:
3804       self.instance_status = 'up'
3805     else:
3806       self.instance_status = 'down'
3807
3808   def Exec(self, feedback_fn):
3809     """Create and add the instance to the cluster.
3810
3811     """
3812     instance = self.op.instance_name
3813     pnode_name = self.pnode.name
3814
3815     if self.op.mac == "auto":
3816       mac_address = self.cfg.GenerateMAC()
3817     else:
3818       mac_address = self.op.mac
3819
3820     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3821     if self.inst_ip is not None:
3822       nic.ip = self.inst_ip
3823
3824     ht_kind = self.sstore.GetHypervisorType()
3825     if ht_kind in constants.HTS_REQ_PORT:
3826       network_port = self.cfg.AllocatePort()
3827     else:
3828       network_port = None
3829
3830     if self.op.vnc_bind_address is None:
3831       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3832
3833     disks = _GenerateDiskTemplate(self.cfg,
3834                                   self.op.disk_template,
3835                                   instance, pnode_name,
3836                                   self.secondaries, self.op.disk_size,
3837                                   self.op.swap_size)
3838
3839     iobj = objects.Instance(name=instance, os=self.op.os_type,
3840                             primary_node=pnode_name,
3841                             memory=self.op.mem_size,
3842                             vcpus=self.op.vcpus,
3843                             nics=[nic], disks=disks,
3844                             disk_template=self.op.disk_template,
3845                             status=self.instance_status,
3846                             network_port=network_port,
3847                             kernel_path=self.op.kernel_path,
3848                             initrd_path=self.op.initrd_path,
3849                             hvm_boot_order=self.op.hvm_boot_order,
3850                             hvm_acpi=self.op.hvm_acpi,
3851                             hvm_pae=self.op.hvm_pae,
3852                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3853                             vnc_bind_address=self.op.vnc_bind_address,
3854                             hvm_nic_type=self.op.hvm_nic_type,
3855                             hvm_disk_type=self.op.hvm_disk_type,
3856                             auto_balance=bool(self.op.auto_balance),
3857                             )
3858
3859     feedback_fn("* creating instance disks...")
3860     if not _CreateDisks(self.cfg, iobj):
3861       _RemoveDisks(iobj, self.cfg)
3862       raise errors.OpExecError("Device creation failed, reverting...")
3863
3864     feedback_fn("adding instance %s to cluster config" % instance)
3865
3866     self.cfg.AddInstance(iobj)
3867
3868     if self.op.wait_for_sync:
3869       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3870     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3871       # make sure the disks are not degraded (still sync-ing is ok)
3872       time.sleep(15)
3873       feedback_fn("* checking mirrors status")
3874       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3875     else:
3876       disk_abort = False
3877
3878     if disk_abort:
3879       _RemoveDisks(iobj, self.cfg)
3880       self.cfg.RemoveInstance(iobj.name)
3881       raise errors.OpExecError("There are some degraded disks for"
3882                                " this instance")
3883
3884     feedback_fn("creating os for instance %s on node %s" %
3885                 (instance, pnode_name))
3886
3887     if iobj.disk_template != constants.DT_DISKLESS:
3888       if self.op.mode == constants.INSTANCE_CREATE:
3889         feedback_fn("* running the instance OS create scripts...")
3890         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3891           raise errors.OpExecError("could not add os for instance %s"
3892                                    " on node %s" %
3893                                    (instance, pnode_name))
3894
3895       elif self.op.mode == constants.INSTANCE_IMPORT:
3896         feedback_fn("* running the instance OS import scripts...")
3897         src_node = self.op.src_node
3898         src_image = self.src_image
3899         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3900                                                 src_node, src_image):
3901           raise errors.OpExecError("Could not import os for instance"
3902                                    " %s on node %s" %
3903                                    (instance, pnode_name))
3904       else:
3905         # also checked in the prereq part
3906         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3907                                      % self.op.mode)
3908
3909     if self.op.start:
3910       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3911       feedback_fn("* starting instance...")
3912       if not rpc.call_instance_start(pnode_name, iobj, None):
3913         raise errors.OpExecError("Could not start instance")
3914
3915
3916 class LUConnectConsole(NoHooksLU):
3917   """Connect to an instance's console.
3918
3919   This is somewhat special in that it returns the command line that
3920   you need to run on the master node in order to connect to the
3921   console.
3922
3923   """
3924   _OP_REQP = ["instance_name"]
3925
3926   def CheckPrereq(self):
3927     """Check prerequisites.
3928
3929     This checks that the instance is in the cluster.
3930
3931     """
3932     instance = self.cfg.GetInstanceInfo(
3933       self.cfg.ExpandInstanceName(self.op.instance_name))
3934     if instance is None:
3935       raise errors.OpPrereqError("Instance '%s' not known" %
3936                                  self.op.instance_name)
3937     self.instance = instance
3938
3939   def Exec(self, feedback_fn):
3940     """Connect to the console of an instance
3941
3942     """
3943     instance = self.instance
3944     node = instance.primary_node
3945
3946     node_insts = rpc.call_instance_list([node])[node]
3947     if node_insts is False:
3948       raise errors.OpExecError("Can't connect to node %s." % node)
3949
3950     if instance.name not in node_insts:
3951       raise errors.OpExecError("Instance %s is not running." % instance.name)
3952
3953     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3954
3955     hyper = hypervisor.GetHypervisor()
3956     console_cmd = hyper.GetShellCommandForConsole(instance)
3957     # build ssh cmdline
3958     argv = ["ssh", "-q", "-t"]
3959     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3960     argv.extend(ssh.BATCH_MODE_OPTS)
3961     argv.append(node)
3962     argv.append(console_cmd)
3963     return "ssh", argv
3964
3965
3966 class LUAddMDDRBDComponent(LogicalUnit):
3967   """Adda new mirror member to an instance's disk.
3968
3969   """
3970   HPATH = "mirror-add"
3971   HTYPE = constants.HTYPE_INSTANCE
3972   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3973
3974   def BuildHooksEnv(self):
3975     """Build hooks env.
3976
3977     This runs on the master, the primary and all the secondaries.
3978
3979     """
3980     env = {
3981       "NEW_SECONDARY": self.op.remote_node,
3982       "DISK_NAME": self.op.disk_name,
3983       }
3984     env.update(_BuildInstanceHookEnvByObject(self.instance))
3985     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3986           self.op.remote_node,] + list(self.instance.secondary_nodes)
3987     return env, nl, nl
3988
3989   def CheckPrereq(self):
3990     """Check prerequisites.
3991
3992     This checks that the instance is in the cluster.
3993
3994     """
3995     instance = self.cfg.GetInstanceInfo(
3996       self.cfg.ExpandInstanceName(self.op.instance_name))
3997     if instance is None:
3998       raise errors.OpPrereqError("Instance '%s' not known" %
3999                                  self.op.instance_name)
4000     self.instance = instance
4001
4002     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4003     if remote_node is None:
4004       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
4005     self.remote_node = remote_node
4006
4007     if remote_node == instance.primary_node:
4008       raise errors.OpPrereqError("The specified node is the primary node of"
4009                                  " the instance.")
4010
4011     if instance.disk_template != constants.DT_REMOTE_RAID1:
4012       raise errors.OpPrereqError("Instance's disk layout is not"
4013                                  " remote_raid1.")
4014     for disk in instance.disks:
4015       if disk.iv_name == self.op.disk_name:
4016         break
4017     else:
4018       raise errors.OpPrereqError("Can't find this device ('%s') in the"
4019                                  " instance." % self.op.disk_name)
4020     if len(disk.children) > 1:
4021       raise errors.OpPrereqError("The device already has two slave devices."
4022                                  " This would create a 3-disk raid1 which we"
4023                                  " don't allow.")
4024     self.disk = disk
4025
4026   def Exec(self, feedback_fn):
4027     """Add the mirror component
4028
4029     """
4030     disk = self.disk
4031     instance = self.instance
4032
4033     remote_node = self.remote_node
4034     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
4035     names = _GenerateUniqueNames(self.cfg, lv_names)
4036     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
4037                                      remote_node, disk.size, names)
4038
4039     logger.Info("adding new mirror component on secondary")
4040     #HARDCODE
4041     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4042                                       new_drbd, False,
4043                                       _GetInstanceInfoText(instance)):
4044       raise errors.OpExecError("Failed to create new component on secondary"
4045                                " node %s" % remote_node)
4046
4047     logger.Info("adding new mirror component on primary")
4048     #HARDCODE
4049     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4050                                     instance, new_drbd,
4051                                     _GetInstanceInfoText(instance)):
4052       # remove secondary dev
4053       self.cfg.SetDiskID(new_drbd, remote_node)
4054       rpc.call_blockdev_remove(remote_node, new_drbd)
4055       raise errors.OpExecError("Failed to create volume on primary")
4056
4057     # the device exists now
4058     # call the primary node to add the mirror to md
4059     logger.Info("adding new mirror component to md")
4060     if not rpc.call_blockdev_addchildren(instance.primary_node,
4061                                          disk, [new_drbd]):
4062       logger.Error("Can't add mirror compoment to md!")
4063       self.cfg.SetDiskID(new_drbd, remote_node)
4064       if not rpc.call_blockdev_remove(remote_node, new_drbd):
4065         logger.Error("Can't rollback on secondary")
4066       self.cfg.SetDiskID(new_drbd, instance.primary_node)
4067       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4068         logger.Error("Can't rollback on primary")
4069       raise errors.OpExecError("Can't add mirror component to md array")
4070
4071     disk.children.append(new_drbd)
4072
4073     self.cfg.AddInstance(instance)
4074
4075     _WaitForSync(self.cfg, instance, self.proc)
4076
4077     return 0
4078
4079
4080 class LURemoveMDDRBDComponent(LogicalUnit):
4081   """Remove a component from a remote_raid1 disk.
4082
4083   """
4084   HPATH = "mirror-remove"
4085   HTYPE = constants.HTYPE_INSTANCE
4086   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4087
4088   def BuildHooksEnv(self):
4089     """Build hooks env.
4090
4091     This runs on the master, the primary and all the secondaries.
4092
4093     """
4094     env = {
4095       "DISK_NAME": self.op.disk_name,
4096       "DISK_ID": self.op.disk_id,
4097       "OLD_SECONDARY": self.old_secondary,
4098       }
4099     env.update(_BuildInstanceHookEnvByObject(self.instance))
4100     nl = [self.sstore.GetMasterNode(),
4101           self.instance.primary_node] + list(self.instance.secondary_nodes)
4102     return env, nl, nl
4103
4104   def CheckPrereq(self):
4105     """Check prerequisites.
4106
4107     This checks that the instance is in the cluster.
4108
4109     """
4110     instance = self.cfg.GetInstanceInfo(
4111       self.cfg.ExpandInstanceName(self.op.instance_name))
4112     if instance is None:
4113       raise errors.OpPrereqError("Instance '%s' not known" %
4114                                  self.op.instance_name)
4115     self.instance = instance
4116
4117     if instance.disk_template != constants.DT_REMOTE_RAID1:
4118       raise errors.OpPrereqError("Instance's disk layout is not"
4119                                  " remote_raid1.")
4120     for disk in instance.disks:
4121       if disk.iv_name == self.op.disk_name:
4122         break
4123     else:
4124       raise errors.OpPrereqError("Can't find this device ('%s') in the"
4125                                  " instance." % self.op.disk_name)
4126     for child in disk.children:
4127       if (child.dev_type == constants.LD_DRBD7 and
4128           child.logical_id[2] == self.op.disk_id):
4129         break
4130     else:
4131       raise errors.OpPrereqError("Can't find the device with this port.")
4132
4133     if len(disk.children) < 2:
4134       raise errors.OpPrereqError("Cannot remove the last component from"
4135                                  " a mirror.")
4136     self.disk = disk
4137     self.child = child
4138     if self.child.logical_id[0] == instance.primary_node:
4139       oid = 1
4140     else:
4141       oid = 0
4142     self.old_secondary = self.child.logical_id[oid]
4143
4144   def Exec(self, feedback_fn):
4145     """Remove the mirror component
4146
4147     """
4148     instance = self.instance
4149     disk = self.disk
4150     child = self.child
4151     logger.Info("remove mirror component")
4152     self.cfg.SetDiskID(disk, instance.primary_node)
4153     if not rpc.call_blockdev_removechildren(instance.primary_node,
4154                                             disk, [child]):
4155       raise errors.OpExecError("Can't remove child from mirror.")
4156
4157     for node in child.logical_id[:2]:
4158       self.cfg.SetDiskID(child, node)
4159       if not rpc.call_blockdev_remove(node, child):
4160         logger.Error("Warning: failed to remove device from node %s,"
4161                      " continuing operation." % node)
4162
4163     disk.children.remove(child)
4164     self.cfg.AddInstance(instance)
4165
4166
4167 class LUReplaceDisks(LogicalUnit):
4168   """Replace the disks of an instance.
4169
4170   """
4171   HPATH = "mirrors-replace"
4172   HTYPE = constants.HTYPE_INSTANCE
4173   _OP_REQP = ["instance_name", "mode", "disks"]
4174
4175   def _RunAllocator(self):
4176     """Compute a new secondary node using an IAllocator.
4177
4178     """
4179     ial = IAllocator(self.cfg, self.sstore,
4180                      mode=constants.IALLOCATOR_MODE_RELOC,
4181                      name=self.op.instance_name,
4182                      relocate_from=[self.sec_node])
4183
4184     ial.Run(self.op.iallocator)
4185
4186     if not ial.success:
4187       raise errors.OpPrereqError("Can't compute nodes using"
4188                                  " iallocator '%s': %s" % (self.op.iallocator,
4189                                                            ial.info))
4190     if len(ial.nodes) != ial.required_nodes:
4191       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4192                                  " of nodes (%s), required %s" %
4193                                  (len(ial.nodes), ial.required_nodes))
4194     self.op.remote_node = ial.nodes[0]
4195     logger.ToStdout("Selected new secondary for the instance: %s" %
4196                     self.op.remote_node)
4197
4198   def BuildHooksEnv(self):
4199     """Build hooks env.
4200
4201     This runs on the master, the primary and all the secondaries.
4202
4203     """
4204     env = {
4205       "MODE": self.op.mode,
4206       "NEW_SECONDARY": self.op.remote_node,
4207       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4208       }
4209     env.update(_BuildInstanceHookEnvByObject(self.instance))
4210     nl = [
4211       self.sstore.GetMasterNode(),
4212       self.instance.primary_node,
4213       ]
4214     if self.op.remote_node is not None:
4215       nl.append(self.op.remote_node)
4216     return env, nl, nl
4217
4218   def CheckPrereq(self):
4219     """Check prerequisites.
4220
4221     This checks that the instance is in the cluster.
4222
4223     """
4224     if not hasattr(self.op, "remote_node"):
4225       self.op.remote_node = None
4226
4227     instance = self.cfg.GetInstanceInfo(
4228       self.cfg.ExpandInstanceName(self.op.instance_name))
4229     if instance is None:
4230       raise errors.OpPrereqError("Instance '%s' not known" %
4231                                  self.op.instance_name)
4232     self.instance = instance
4233     self.op.instance_name = instance.name
4234
4235     if instance.disk_template not in constants.DTS_NET_MIRROR:
4236       raise errors.OpPrereqError("Instance's disk layout is not"
4237                                  " network mirrored.")
4238
4239     if len(instance.secondary_nodes) != 1:
4240       raise errors.OpPrereqError("The instance has a strange layout,"
4241                                  " expected one secondary but found %d" %
4242                                  len(instance.secondary_nodes))
4243
4244     self.sec_node = instance.secondary_nodes[0]
4245
4246     ia_name = getattr(self.op, "iallocator", None)
4247     if ia_name is not None:
4248       if self.op.remote_node is not None:
4249         raise errors.OpPrereqError("Give either the iallocator or the new"
4250                                    " secondary, not both")
4251       self._RunAllocator()
4252
4253     remote_node = self.op.remote_node
4254     if remote_node is not None:
4255       remote_node = self.cfg.ExpandNodeName(remote_node)
4256       if remote_node is None:
4257         raise errors.OpPrereqError("Node '%s' not known" %
4258                                    self.op.remote_node)
4259       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4260     else:
4261       self.remote_node_info = None
4262     if remote_node == instance.primary_node:
4263       raise errors.OpPrereqError("The specified node is the primary node of"
4264                                  " the instance.")
4265     elif remote_node == self.sec_node:
4266       if self.op.mode == constants.REPLACE_DISK_SEC:
4267         # this is for DRBD8, where we can't execute the same mode of
4268         # replacement as for drbd7 (no different port allocated)
4269         raise errors.OpPrereqError("Same secondary given, cannot execute"
4270                                    " replacement")
4271       # the user gave the current secondary, switch to
4272       # 'no-replace-secondary' mode for drbd7
4273       remote_node = None
4274     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4275         self.op.mode != constants.REPLACE_DISK_ALL):
4276       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4277                                  " disks replacement, not individual ones")
4278     if instance.disk_template == constants.DT_DRBD8:
4279       if (self.op.mode == constants.REPLACE_DISK_ALL and
4280           remote_node is not None):
4281         # switch to replace secondary mode
4282         self.op.mode = constants.REPLACE_DISK_SEC
4283
4284       if self.op.mode == constants.REPLACE_DISK_ALL:
4285         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4286                                    " secondary disk replacement, not"
4287                                    " both at once")
4288       elif self.op.mode == constants.REPLACE_DISK_PRI:
4289         if remote_node is not None:
4290           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4291                                      " the secondary while doing a primary"
4292                                      " node disk replacement")
4293         self.tgt_node = instance.primary_node
4294         self.oth_node = instance.secondary_nodes[0]
4295       elif self.op.mode == constants.REPLACE_DISK_SEC:
4296         self.new_node = remote_node # this can be None, in which case
4297                                     # we don't change the secondary
4298         self.tgt_node = instance.secondary_nodes[0]
4299         self.oth_node = instance.primary_node
4300       else:
4301         raise errors.ProgrammerError("Unhandled disk replace mode")
4302
4303     for name in self.op.disks:
4304       if instance.FindDisk(name) is None:
4305         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4306                                    (name, instance.name))
4307     self.op.remote_node = remote_node
4308
4309   def _ExecRR1(self, feedback_fn):
4310     """Replace the disks of an instance.
4311
4312     """
4313     instance = self.instance
4314     iv_names = {}
4315     # start of work
4316     if self.op.remote_node is None:
4317       remote_node = self.sec_node
4318     else:
4319       remote_node = self.op.remote_node
4320     cfg = self.cfg
4321     for dev in instance.disks:
4322       size = dev.size
4323       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4324       names = _GenerateUniqueNames(cfg, lv_names)
4325       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4326                                        remote_node, size, names)
4327       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4328       logger.Info("adding new mirror component on secondary for %s" %
4329                   dev.iv_name)
4330       #HARDCODE
4331       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4332                                         new_drbd, False,
4333                                         _GetInstanceInfoText(instance)):
4334         raise errors.OpExecError("Failed to create new component on secondary"
4335                                  " node %s. Full abort, cleanup manually!" %
4336                                  remote_node)
4337
4338       logger.Info("adding new mirror component on primary")
4339       #HARDCODE
4340       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4341                                       instance, new_drbd,
4342                                       _GetInstanceInfoText(instance)):
4343         # remove secondary dev
4344         cfg.SetDiskID(new_drbd, remote_node)
4345         rpc.call_blockdev_remove(remote_node, new_drbd)
4346         raise errors.OpExecError("Failed to create volume on primary!"
4347                                  " Full abort, cleanup manually!!")
4348
4349       # the device exists now
4350       # call the primary node to add the mirror to md
4351       logger.Info("adding new mirror component to md")
4352       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4353                                            [new_drbd]):
4354         logger.Error("Can't add mirror compoment to md!")
4355         cfg.SetDiskID(new_drbd, remote_node)
4356         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4357           logger.Error("Can't rollback on secondary")
4358         cfg.SetDiskID(new_drbd, instance.primary_node)
4359         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4360           logger.Error("Can't rollback on primary")
4361         raise errors.OpExecError("Full abort, cleanup manually!!")
4362
4363       dev.children.append(new_drbd)
4364       cfg.AddInstance(instance)
4365
4366     # this can fail as the old devices are degraded and _WaitForSync
4367     # does a combined result over all disks, so we don't check its
4368     # return value
4369     _WaitForSync(cfg, instance, self.proc, unlock=True)
4370
4371     # so check manually all the devices
4372     for name in iv_names:
4373       dev, child, new_drbd = iv_names[name]
4374       cfg.SetDiskID(dev, instance.primary_node)
4375       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4376       if is_degr:
4377         raise errors.OpExecError("MD device %s is degraded!" % name)
4378       cfg.SetDiskID(new_drbd, instance.primary_node)
4379       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4380       if is_degr:
4381         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4382
4383     for name in iv_names:
4384       dev, child, new_drbd = iv_names[name]
4385       logger.Info("remove mirror %s component" % name)
4386       cfg.SetDiskID(dev, instance.primary_node)
4387       if not rpc.call_blockdev_removechildren(instance.primary_node,
4388                                               dev, [child]):
4389         logger.Error("Can't remove child from mirror, aborting"
4390                      " *this device cleanup*.\nYou need to cleanup manually!!")
4391         continue
4392
4393       for node in child.logical_id[:2]:
4394         logger.Info("remove child device on %s" % node)
4395         cfg.SetDiskID(child, node)
4396         if not rpc.call_blockdev_remove(node, child):
4397           logger.Error("Warning: failed to remove device from node %s,"
4398                        " continuing operation." % node)
4399
4400       dev.children.remove(child)
4401
4402       cfg.AddInstance(instance)
4403
4404   def _ExecD8DiskOnly(self, feedback_fn):
4405     """Replace a disk on the primary or secondary for dbrd8.
4406
4407     The algorithm for replace is quite complicated:
4408       - for each disk to be replaced:
4409         - create new LVs on the target node with unique names
4410         - detach old LVs from the drbd device
4411         - rename old LVs to name_replaced.<time_t>
4412         - rename new LVs to old LVs
4413         - attach the new LVs (with the old names now) to the drbd device
4414       - wait for sync across all devices
4415       - for each modified disk:
4416         - remove old LVs (which have the name name_replaces.<time_t>)
4417
4418     Failures are not very well handled.
4419
4420     """
4421     steps_total = 6
4422     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4423     instance = self.instance
4424     iv_names = {}
4425     vgname = self.cfg.GetVGName()
4426     # start of work
4427     cfg = self.cfg
4428     tgt_node = self.tgt_node
4429     oth_node = self.oth_node
4430
4431     # Step: check device activation
4432     self.proc.LogStep(1, steps_total, "check device existence")
4433     info("checking volume groups")
4434     my_vg = cfg.GetVGName()
4435     results = rpc.call_vg_list([oth_node, tgt_node])
4436     if not results:
4437       raise errors.OpExecError("Can't list volume groups on the nodes")
4438     for node in oth_node, tgt_node:
4439       res = results.get(node, False)
4440       if not res or my_vg not in res:
4441         raise errors.OpExecError("Volume group '%s' not found on %s" %
4442                                  (my_vg, node))
4443     for dev in instance.disks:
4444       if not dev.iv_name in self.op.disks:
4445         continue
4446       for node in tgt_node, oth_node:
4447         info("checking %s on %s" % (dev.iv_name, node))
4448         cfg.SetDiskID(dev, node)
4449         if not rpc.call_blockdev_find(node, dev):
4450           raise errors.OpExecError("Can't find device %s on node %s" %
4451                                    (dev.iv_name, node))
4452
4453     # Step: check other node consistency
4454     self.proc.LogStep(2, steps_total, "check peer consistency")
4455     for dev in instance.disks:
4456       if not dev.iv_name in self.op.disks:
4457         continue
4458       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4459       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4460                                    oth_node==instance.primary_node):
4461         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4462                                  " to replace disks on this node (%s)" %
4463                                  (oth_node, tgt_node))
4464
4465     # Step: create new storage
4466     self.proc.LogStep(3, steps_total, "allocate new storage")
4467     for dev in instance.disks:
4468       if not dev.iv_name in self.op.disks:
4469         continue
4470       size = dev.size
4471       cfg.SetDiskID(dev, tgt_node)
4472       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4473       names = _GenerateUniqueNames(cfg, lv_names)
4474       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4475                              logical_id=(vgname, names[0]))
4476       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4477                              logical_id=(vgname, names[1]))
4478       new_lvs = [lv_data, lv_meta]
4479       old_lvs = dev.children
4480       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4481       info("creating new local storage on %s for %s" %
4482            (tgt_node, dev.iv_name))
4483       # since we *always* want to create this LV, we use the
4484       # _Create...OnPrimary (which forces the creation), even if we
4485       # are talking about the secondary node
4486       for new_lv in new_lvs:
4487         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4488                                         _GetInstanceInfoText(instance)):
4489           raise errors.OpExecError("Failed to create new LV named '%s' on"
4490                                    " node '%s'" %
4491                                    (new_lv.logical_id[1], tgt_node))
4492
4493     # Step: for each lv, detach+rename*2+attach
4494     self.proc.LogStep(4, steps_total, "change drbd configuration")
4495     for dev, old_lvs, new_lvs in iv_names.itervalues():
4496       info("detaching %s drbd from local storage" % dev.iv_name)
4497       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4498         raise errors.OpExecError("Can't detach drbd from local storage on node"
4499                                  " %s for device %s" % (tgt_node, dev.iv_name))
4500       #dev.children = []
4501       #cfg.Update(instance)
4502
4503       # ok, we created the new LVs, so now we know we have the needed
4504       # storage; as such, we proceed on the target node to rename
4505       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4506       # using the assumption that logical_id == physical_id (which in
4507       # turn is the unique_id on that node)
4508
4509       # FIXME(iustin): use a better name for the replaced LVs
4510       temp_suffix = int(time.time())
4511       ren_fn = lambda d, suff: (d.physical_id[0],
4512                                 d.physical_id[1] + "_replaced-%s" % suff)
4513       # build the rename list based on what LVs exist on the node
4514       rlist = []
4515       for to_ren in old_lvs:
4516         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4517         if find_res is not None: # device exists
4518           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4519
4520       info("renaming the old LVs on the target node")
4521       if not rpc.call_blockdev_rename(tgt_node, rlist):
4522         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4523       # now we rename the new LVs to the old LVs
4524       info("renaming the new LVs on the target node")
4525       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4526       if not rpc.call_blockdev_rename(tgt_node, rlist):
4527         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4528
4529       for old, new in zip(old_lvs, new_lvs):
4530         new.logical_id = old.logical_id
4531         cfg.SetDiskID(new, tgt_node)
4532
4533       for disk in old_lvs:
4534         disk.logical_id = ren_fn(disk, temp_suffix)
4535         cfg.SetDiskID(disk, tgt_node)
4536
4537       # now that the new lvs have the old name, we can add them to the device
4538       info("adding new mirror component on %s" % tgt_node)
4539       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4540         for new_lv in new_lvs:
4541           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4542             warning("Can't rollback device %s", hint="manually cleanup unused"
4543                     " logical volumes")
4544         raise errors.OpExecError("Can't add local storage to drbd")
4545
4546       dev.children = new_lvs
4547       cfg.Update(instance)
4548
4549     # Step: wait for sync
4550
4551     # this can fail as the old devices are degraded and _WaitForSync
4552     # does a combined result over all disks, so we don't check its
4553     # return value
4554     self.proc.LogStep(5, steps_total, "sync devices")
4555     _WaitForSync(cfg, instance, self.proc, unlock=True)
4556
4557     # so check manually all the devices
4558     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4559       cfg.SetDiskID(dev, instance.primary_node)
4560       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4561       if is_degr:
4562         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4563
4564     # Step: remove old storage
4565     self.proc.LogStep(6, steps_total, "removing old storage")
4566     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4567       info("remove logical volumes for %s" % name)
4568       for lv in old_lvs:
4569         cfg.SetDiskID(lv, tgt_node)
4570         if not rpc.call_blockdev_remove(tgt_node, lv):
4571           warning("Can't remove old LV", hint="manually remove unused LVs")
4572           continue
4573
4574   def _ExecD8Secondary(self, feedback_fn):
4575     """Replace the secondary node for drbd8.
4576
4577     The algorithm for replace is quite complicated:
4578       - for all disks of the instance:
4579         - create new LVs on the new node with same names
4580         - shutdown the drbd device on the old secondary
4581         - disconnect the drbd network on the primary
4582         - create the drbd device on the new secondary
4583         - network attach the drbd on the primary, using an artifice:
4584           the drbd code for Attach() will connect to the network if it
4585           finds a device which is connected to the good local disks but
4586           not network enabled
4587       - wait for sync across all devices
4588       - remove all disks from the old secondary
4589
4590     Failures are not very well handled.
4591
4592     """
4593     steps_total = 6
4594     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4595     instance = self.instance
4596     iv_names = {}
4597     vgname = self.cfg.GetVGName()
4598     # start of work
4599     cfg = self.cfg
4600     old_node = self.tgt_node
4601     new_node = self.new_node
4602     pri_node = instance.primary_node
4603
4604     # Step: check device activation
4605     self.proc.LogStep(1, steps_total, "check device existence")
4606     info("checking volume groups")
4607     my_vg = cfg.GetVGName()
4608     results = rpc.call_vg_list([pri_node, new_node])
4609     if not results:
4610       raise errors.OpExecError("Can't list volume groups on the nodes")
4611     for node in pri_node, new_node:
4612       res = results.get(node, False)
4613       if not res or my_vg not in res:
4614         raise errors.OpExecError("Volume group '%s' not found on %s" %
4615                                  (my_vg, node))
4616     for dev in instance.disks:
4617       if not dev.iv_name in self.op.disks:
4618         continue
4619       info("checking %s on %s" % (dev.iv_name, pri_node))
4620       cfg.SetDiskID(dev, pri_node)
4621       if not rpc.call_blockdev_find(pri_node, dev):
4622         raise errors.OpExecError("Can't find device %s on node %s" %
4623                                  (dev.iv_name, pri_node))
4624
4625     # Step: check other node consistency
4626     self.proc.LogStep(2, steps_total, "check peer consistency")
4627     for dev in instance.disks:
4628       if not dev.iv_name in self.op.disks:
4629         continue
4630       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4631       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4632         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4633                                  " unsafe to replace the secondary" %
4634                                  pri_node)
4635
4636     # Step: create new storage
4637     self.proc.LogStep(3, steps_total, "allocate new storage")
4638     for dev in instance.disks:
4639       size = dev.size
4640       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4641       # since we *always* want to create this LV, we use the
4642       # _Create...OnPrimary (which forces the creation), even if we
4643       # are talking about the secondary node
4644       for new_lv in dev.children:
4645         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4646                                         _GetInstanceInfoText(instance)):
4647           raise errors.OpExecError("Failed to create new LV named '%s' on"
4648                                    " node '%s'" %
4649                                    (new_lv.logical_id[1], new_node))
4650
4651       iv_names[dev.iv_name] = (dev, dev.children)
4652
4653     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4654     for dev in instance.disks:
4655       size = dev.size
4656       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4657       # create new devices on new_node
4658       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4659                               logical_id=(pri_node, new_node,
4660                                           dev.logical_id[2]),
4661                               children=dev.children)
4662       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4663                                         new_drbd, False,
4664                                       _GetInstanceInfoText(instance)):
4665         raise errors.OpExecError("Failed to create new DRBD on"
4666                                  " node '%s'" % new_node)
4667
4668     for dev in instance.disks:
4669       # we have new devices, shutdown the drbd on the old secondary
4670       info("shutting down drbd for %s on old node" % dev.iv_name)
4671       cfg.SetDiskID(dev, old_node)
4672       if not rpc.call_blockdev_shutdown(old_node, dev):
4673         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4674                 hint="Please cleanup this device manually as soon as possible")
4675
4676     info("detaching primary drbds from the network (=> standalone)")
4677     done = 0
4678     for dev in instance.disks:
4679       cfg.SetDiskID(dev, pri_node)
4680       # set the physical (unique in bdev terms) id to None, meaning
4681       # detach from network
4682       dev.physical_id = (None,) * len(dev.physical_id)
4683       # and 'find' the device, which will 'fix' it to match the
4684       # standalone state
4685       if rpc.call_blockdev_find(pri_node, dev):
4686         done += 1
4687       else:
4688         warning("Failed to detach drbd %s from network, unusual case" %
4689                 dev.iv_name)
4690
4691     if not done:
4692       # no detaches succeeded (very unlikely)
4693       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4694
4695     # if we managed to detach at least one, we update all the disks of
4696     # the instance to point to the new secondary
4697     info("updating instance configuration")
4698     for dev in instance.disks:
4699       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4700       cfg.SetDiskID(dev, pri_node)
4701     cfg.Update(instance)
4702
4703     # and now perform the drbd attach
4704     info("attaching primary drbds to new secondary (standalone => connected)")
4705     failures = []
4706     for dev in instance.disks:
4707       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4708       # since the attach is smart, it's enough to 'find' the device,
4709       # it will automatically activate the network, if the physical_id
4710       # is correct
4711       cfg.SetDiskID(dev, pri_node)
4712       if not rpc.call_blockdev_find(pri_node, dev):
4713         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4714                 "please do a gnt-instance info to see the status of disks")
4715
4716     # this can fail as the old devices are degraded and _WaitForSync
4717     # does a combined result over all disks, so we don't check its
4718     # return value
4719     self.proc.LogStep(5, steps_total, "sync devices")
4720     _WaitForSync(cfg, instance, self.proc, unlock=True)
4721
4722     # so check manually all the devices
4723     for name, (dev, old_lvs) in iv_names.iteritems():
4724       cfg.SetDiskID(dev, pri_node)
4725       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4726       if is_degr:
4727         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4728
4729     self.proc.LogStep(6, steps_total, "removing old storage")
4730     for name, (dev, old_lvs) in iv_names.iteritems():
4731       info("remove logical volumes for %s" % name)
4732       for lv in old_lvs:
4733         cfg.SetDiskID(lv, old_node)
4734         if not rpc.call_blockdev_remove(old_node, lv):
4735           warning("Can't remove LV on old secondary",
4736                   hint="Cleanup stale volumes by hand")
4737
4738   def Exec(self, feedback_fn):
4739     """Execute disk replacement.
4740
4741     This dispatches the disk replacement to the appropriate handler.
4742
4743     """
4744     instance = self.instance
4745
4746     # Activate the instance disks if we're replacing them on a down instance
4747     if instance.status == "down":
4748       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4749       self.proc.ChainOpCode(op)
4750
4751     if instance.disk_template == constants.DT_REMOTE_RAID1:
4752       fn = self._ExecRR1
4753     elif instance.disk_template == constants.DT_DRBD8:
4754       if self.op.remote_node is None:
4755         fn = self._ExecD8DiskOnly
4756       else:
4757         fn = self._ExecD8Secondary
4758     else:
4759       raise errors.ProgrammerError("Unhandled disk replacement case")
4760
4761     ret = fn(feedback_fn)
4762
4763     # Deactivate the instance disks if we're replacing them on a down instance
4764     if instance.status == "down":
4765       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4766       self.proc.ChainOpCode(op)
4767
4768     return ret
4769
4770
4771 class LUGrowDisk(LogicalUnit):
4772   """Grow a disk of an instance.
4773
4774   """
4775   HPATH = "disk-grow"
4776   HTYPE = constants.HTYPE_INSTANCE
4777   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4778
4779   def BuildHooksEnv(self):
4780     """Build hooks env.
4781
4782     This runs on the master, the primary and all the secondaries.
4783
4784     """
4785     env = {
4786       "DISK": self.op.disk,
4787       "AMOUNT": self.op.amount,
4788       }
4789     env.update(_BuildInstanceHookEnvByObject(self.instance))
4790     nl = [
4791       self.sstore.GetMasterNode(),
4792       self.instance.primary_node,
4793       ]
4794     return env, nl, nl
4795
4796   def CheckPrereq(self):
4797     """Check prerequisites.
4798
4799     This checks that the instance is in the cluster.
4800
4801     """
4802     instance = self.cfg.GetInstanceInfo(
4803       self.cfg.ExpandInstanceName(self.op.instance_name))
4804     if instance is None:
4805       raise errors.OpPrereqError("Instance '%s' not known" %
4806                                  self.op.instance_name)
4807
4808     if self.op.amount <= 0:
4809       raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4810
4811     self.instance = instance
4812     self.op.instance_name = instance.name
4813
4814     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4815       raise errors.OpPrereqError("Instance's disk layout does not support"
4816                                  " growing.")
4817
4818     self.disk = instance.FindDisk(self.op.disk)
4819     if self.disk is None:
4820       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4821                                  (self.op.disk, instance.name))
4822
4823     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4824     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4825     for node in nodenames:
4826       info = nodeinfo.get(node, None)
4827       if not info:
4828         raise errors.OpPrereqError("Cannot get current information"
4829                                    " from node '%s'" % node)
4830       vg_free = info.get('vg_free', None)
4831       if not isinstance(vg_free, int):
4832         raise errors.OpPrereqError("Can't compute free disk space on"
4833                                    " node %s" % node)
4834       if self.op.amount > info['vg_free']:
4835         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4836                                    " %d MiB available, %d MiB required" %
4837                                    (node, info['vg_free'], self.op.amount))
4838       is_primary = (node == instance.primary_node)
4839       if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4840         raise errors.OpPrereqError("Disk %s is degraded or not fully"
4841                                  " synchronized on node %s,"
4842                                  " aborting grow." % (self.op.disk, node))
4843
4844   def Exec(self, feedback_fn):
4845     """Execute disk grow.
4846
4847     """
4848     instance = self.instance
4849     disk = self.disk
4850     for node in (instance.secondary_nodes + (instance.primary_node,)):
4851       self.cfg.SetDiskID(disk, node)
4852       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4853       if not result or not isinstance(result, tuple) or len(result) != 2:
4854         raise errors.OpExecError("grow request failed to node %s" % node)
4855       elif not result[0]:
4856         raise errors.OpExecError("grow request failed to node %s: %s" %
4857                                  (node, result[1]))
4858     disk.RecordGrow(self.op.amount)
4859     self.cfg.Update(instance)
4860     if self.op.wait_for_sync:
4861       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4862       if disk_abort:
4863         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4864                      " Please check the instance.")
4865
4866
4867 class LUQueryInstanceData(NoHooksLU):
4868   """Query runtime instance data.
4869
4870   """
4871   _OP_REQP = ["instances", "static"]
4872
4873   def CheckPrereq(self):
4874     """Check prerequisites.
4875
4876     This only checks the optional instance list against the existing names.
4877
4878     """
4879     if not isinstance(self.op.instances, list):
4880       raise errors.OpPrereqError("Invalid argument type 'instances'")
4881     if self.op.instances:
4882       self.wanted_instances = []
4883       names = self.op.instances
4884       for name in names:
4885         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4886         if instance is None:
4887           raise errors.OpPrereqError("No such instance name '%s'" % name)
4888         self.wanted_instances.append(instance)
4889     else:
4890       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4891                                in self.cfg.GetInstanceList()]
4892     return
4893
4894
4895   def _ComputeDiskStatus(self, instance, snode, dev):
4896     """Compute block device status.
4897
4898     """
4899     static = self.op.static
4900     if not static:
4901       self.cfg.SetDiskID(dev, instance.primary_node)
4902       dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4903     else:
4904       dev_pstatus = None
4905
4906     if dev.dev_type in constants.LDS_DRBD:
4907       # we change the snode then (otherwise we use the one passed in)
4908       if dev.logical_id[0] == instance.primary_node:
4909         snode = dev.logical_id[1]
4910       else:
4911         snode = dev.logical_id[0]
4912
4913     if snode and not static:
4914       self.cfg.SetDiskID(dev, snode)
4915       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4916     else:
4917       dev_sstatus = None
4918
4919     if dev.children:
4920       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4921                       for child in dev.children]
4922     else:
4923       dev_children = []
4924
4925     data = {
4926       "iv_name": dev.iv_name,
4927       "dev_type": dev.dev_type,
4928       "logical_id": dev.logical_id,
4929       "physical_id": dev.physical_id,
4930       "pstatus": dev_pstatus,
4931       "sstatus": dev_sstatus,
4932       "children": dev_children,
4933       }
4934
4935     return data
4936
4937   def Exec(self, feedback_fn):
4938     """Gather and return data"""
4939     result = {}
4940     for instance in self.wanted_instances:
4941       if not self.op.static:
4942         remote_info = rpc.call_instance_info(instance.primary_node,
4943                                                   instance.name)
4944         if remote_info and "state" in remote_info:
4945           remote_state = "up"
4946         else:
4947           remote_state = "down"
4948       else:
4949         remote_state = None
4950       if instance.status == "down":
4951         config_state = "down"
4952       else:
4953         config_state = "up"
4954
4955       disks = [self._ComputeDiskStatus(instance, None, device)
4956                for device in instance.disks]
4957
4958       idict = {
4959         "name": instance.name,
4960         "config_state": config_state,
4961         "run_state": remote_state,
4962         "pnode": instance.primary_node,
4963         "snodes": instance.secondary_nodes,
4964         "os": instance.os,
4965         "memory": instance.memory,
4966         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4967         "disks": disks,
4968         "vcpus": instance.vcpus,
4969         "auto_balance": instance.auto_balance,
4970         }
4971
4972       htkind = self.sstore.GetHypervisorType()
4973       if htkind == constants.HT_XEN_PVM30:
4974         idict["kernel_path"] = instance.kernel_path
4975         idict["initrd_path"] = instance.initrd_path
4976
4977       if htkind == constants.HT_XEN_HVM31:
4978         idict["hvm_boot_order"] = instance.hvm_boot_order
4979         idict["hvm_acpi"] = instance.hvm_acpi
4980         idict["hvm_pae"] = instance.hvm_pae
4981         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4982         idict["hvm_nic_type"] = instance.hvm_nic_type
4983         idict["hvm_disk_type"] = instance.hvm_disk_type
4984
4985       if htkind in constants.HTS_REQ_PORT:
4986         if instance.network_port is None:
4987           vnc_console_port = None
4988         elif instance.vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4989           vnc_console_port = "%s:%s" % (instance.primary_node,
4990                                        instance.network_port)
4991         elif instance.vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4992           vnc_console_port = "%s:%s on node %s" % (instance.vnc_bind_address,
4993                                                    instance.network_port,
4994                                                    instance.primary_node)
4995         else:
4996           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4997                                         instance.network_port)
4998         idict["vnc_console_port"] = vnc_console_port
4999         idict["vnc_bind_address"] = instance.vnc_bind_address
5000         idict["network_port"] = instance.network_port
5001
5002       result[instance.name] = idict
5003
5004     return result
5005
5006
5007 class LUSetInstanceParms(LogicalUnit):
5008   """Modifies an instances's parameters.
5009
5010   """
5011   HPATH = "instance-modify"
5012   HTYPE = constants.HTYPE_INSTANCE
5013   _OP_REQP = ["instance_name"]
5014
5015   def BuildHooksEnv(self):
5016     """Build hooks env.
5017
5018     This runs on the master, primary and secondaries.
5019
5020     """
5021     args = dict()
5022     if self.mem:
5023       args['memory'] = self.mem
5024     if self.vcpus:
5025       args['vcpus'] = self.vcpus
5026     if self.do_ip or self.do_bridge or self.mac:
5027       if self.do_ip:
5028         ip = self.ip
5029       else:
5030         ip = self.instance.nics[0].ip
5031       if self.bridge:
5032         bridge = self.bridge
5033       else:
5034         bridge = self.instance.nics[0].bridge
5035       if self.mac:
5036         mac = self.mac
5037       else:
5038         mac = self.instance.nics[0].mac
5039       args['nics'] = [(ip, bridge, mac)]
5040     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
5041     nl = [self.sstore.GetMasterNode(),
5042           self.instance.primary_node] + list(self.instance.secondary_nodes)
5043     return env, nl, nl
5044
5045   def CheckPrereq(self):
5046     """Check prerequisites.
5047
5048     This only checks the instance list against the existing names.
5049
5050     """
5051     self.mem = getattr(self.op, "mem", None)
5052     self.vcpus = getattr(self.op, "vcpus", None)
5053     self.ip = getattr(self.op, "ip", None)
5054     self.mac = getattr(self.op, "mac", None)
5055     self.bridge = getattr(self.op, "bridge", None)
5056     self.kernel_path = getattr(self.op, "kernel_path", None)
5057     self.initrd_path = getattr(self.op, "initrd_path", None)
5058     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
5059     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
5060     self.hvm_pae = getattr(self.op, "hvm_pae", None)
5061     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
5062     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
5063     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
5064     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
5065     self.force = getattr(self.op, "force", None)
5066     self.auto_balance = getattr(self.op, "auto_balance", None)
5067     all_parms = [
5068       self.mem, self.vcpus, self.ip, self.bridge, self.mac,
5069       self.kernel_path, self.initrd_path, self.hvm_boot_order,
5070       self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5071       self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type,
5072       self.auto_balance,
5073       ]
5074     if all_parms.count(None) == len(all_parms):
5075       raise errors.OpPrereqError("No changes submitted")
5076     if self.mem is not None:
5077       try:
5078         self.mem = int(self.mem)
5079       except ValueError, err:
5080         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5081     if self.vcpus is not None:
5082       try:
5083         self.vcpus = int(self.vcpus)
5084       except ValueError, err:
5085         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5086     if self.ip is not None:
5087       self.do_ip = True
5088       if self.ip.lower() == "none":
5089         self.ip = None
5090       else:
5091         if not utils.IsValidIP(self.ip):
5092           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5093     else:
5094       self.do_ip = False
5095     self.do_bridge = (self.bridge is not None)
5096     if self.mac is not None:
5097       if self.cfg.IsMacInUse(self.mac):
5098         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5099                                    self.mac)
5100       if not utils.IsValidMac(self.mac):
5101         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5102
5103     if self.kernel_path is not None:
5104       self.do_kernel_path = True
5105       if self.kernel_path == constants.VALUE_NONE:
5106         raise errors.OpPrereqError("Can't set instance to no kernel")
5107
5108       if self.kernel_path != constants.VALUE_DEFAULT:
5109         if not os.path.isabs(self.kernel_path):
5110           raise errors.OpPrereqError("The kernel path must be an absolute"
5111                                     " filename")
5112     else:
5113       self.do_kernel_path = False
5114
5115     if self.initrd_path is not None:
5116       self.do_initrd_path = True
5117       if self.initrd_path not in (constants.VALUE_NONE,
5118                                   constants.VALUE_DEFAULT):
5119         if not os.path.isabs(self.initrd_path):
5120           raise errors.OpPrereqError("The initrd path must be an absolute"
5121                                     " filename")
5122     else:
5123       self.do_initrd_path = False
5124
5125     # boot order verification
5126     if self.hvm_boot_order is not None:
5127       if self.hvm_boot_order != constants.VALUE_DEFAULT:
5128         if len(self.hvm_boot_order.strip("acdn")) != 0:
5129           raise errors.OpPrereqError("invalid boot order specified,"
5130                                      " must be one or more of [acdn]"
5131                                      " or 'default'")
5132
5133     # hvm_cdrom_image_path verification
5134     if self.op.hvm_cdrom_image_path is not None:
5135       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5136               self.op.hvm_cdrom_image_path.lower() == "none"):
5137         raise errors.OpPrereqError("The path to the HVM CDROM image must"
5138                                    " be an absolute path or None, not %s" %
5139                                    self.op.hvm_cdrom_image_path)
5140       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5141               self.op.hvm_cdrom_image_path.lower() == "none"):
5142         raise errors.OpPrereqError("The HVM CDROM image must either be a"
5143                                    " regular file or a symlink pointing to"
5144                                    " an existing regular file, not %s" %
5145                                    self.op.hvm_cdrom_image_path)
5146
5147     # vnc_bind_address verification
5148     if self.op.vnc_bind_address is not None:
5149       if not utils.IsValidIP(self.op.vnc_bind_address):
5150         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5151                                    " like a valid IP address" %
5152                                    self.op.vnc_bind_address)
5153
5154     # Xen HVM device type checks
5155     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
5156       if self.op.hvm_nic_type is not None:
5157         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
5158           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
5159                                      " HVM  hypervisor" % self.op.hvm_nic_type)
5160       if self.op.hvm_disk_type is not None:
5161         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
5162           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
5163                                      " HVM hypervisor" % self.op.hvm_disk_type)
5164
5165     # auto balance setting
5166     if self.auto_balance is not None:
5167       # convert the value to a proper bool value, if it's not
5168       self.auto_balance = bool(self.auto_balance)
5169
5170     instance = self.cfg.GetInstanceInfo(
5171       self.cfg.ExpandInstanceName(self.op.instance_name))
5172     if instance is None:
5173       raise errors.OpPrereqError("No such instance name '%s'" %
5174                                  self.op.instance_name)
5175     self.op.instance_name = instance.name
5176     self.instance = instance
5177     self.warn = []
5178     if self.mem is not None and not self.force:
5179       pnode = self.instance.primary_node
5180       nodelist = [pnode]
5181       if instance.auto_balance:
5182         nodelist.extend(instance.secondary_nodes)
5183       instance_info = rpc.call_instance_info(pnode, instance.name)
5184       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
5185
5186       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
5187         # Assume the primary node is unreachable and go ahead
5188         self.warn.append("Can't get info from primary node %s" % pnode)
5189       else:
5190         if instance_info:
5191           current_mem = instance_info['memory']
5192         else:
5193           # Assume instance not running
5194           # (there is a slight race condition here, but it's not very probable,
5195           # and we have no other way to check)
5196           current_mem = 0
5197         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
5198         if miss_mem > 0:
5199           raise errors.OpPrereqError("This change will prevent the instance"
5200                                      " from starting, due to %d MB of memory"
5201                                      " missing on its primary node" % miss_mem)
5202
5203       if instance.auto_balance:
5204         for node in instance.secondary_nodes:
5205           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5206             self.warn.append("Can't get info from secondary node %s" % node)
5207           elif self.mem > nodeinfo[node]['memory_free']:
5208             self.warn.append("Not enough memory to failover instance to"
5209                              " secondary node %s" % node)
5210     return
5211
5212   def Exec(self, feedback_fn):
5213     """Modifies an instance.
5214
5215     All parameters take effect only at the next restart of the instance.
5216     """
5217     # Process here the warnings from CheckPrereq, as we don't have a
5218     # feedback_fn there.
5219     for warn in self.warn:
5220       feedback_fn("WARNING: %s" % warn)
5221
5222     result = []
5223     instance = self.instance
5224     if self.mem:
5225       instance.memory = self.mem
5226       result.append(("mem", self.mem))
5227     if self.vcpus:
5228       instance.vcpus = self.vcpus
5229       result.append(("vcpus",  self.vcpus))
5230     if self.do_ip:
5231       instance.nics[0].ip = self.ip
5232       result.append(("ip", self.ip))
5233     if self.bridge:
5234       instance.nics[0].bridge = self.bridge
5235       result.append(("bridge", self.bridge))
5236     if self.mac:
5237       instance.nics[0].mac = self.mac
5238       result.append(("mac", self.mac))
5239     if self.do_kernel_path:
5240       instance.kernel_path = self.kernel_path
5241       result.append(("kernel_path", self.kernel_path))
5242     if self.do_initrd_path:
5243       instance.initrd_path = self.initrd_path
5244       result.append(("initrd_path", self.initrd_path))
5245     if self.hvm_boot_order:
5246       if self.hvm_boot_order == constants.VALUE_DEFAULT:
5247         instance.hvm_boot_order = None
5248       else:
5249         instance.hvm_boot_order = self.hvm_boot_order
5250       result.append(("hvm_boot_order", self.hvm_boot_order))
5251     if self.hvm_acpi is not None:
5252       instance.hvm_acpi = self.hvm_acpi
5253       result.append(("hvm_acpi", self.hvm_acpi))
5254     if self.hvm_pae is not None:
5255       instance.hvm_pae = self.hvm_pae
5256       result.append(("hvm_pae", self.hvm_pae))
5257     if self.hvm_nic_type is not None:
5258       instance.hvm_nic_type = self.hvm_nic_type
5259       result.append(("hvm_nic_type", self.hvm_nic_type))
5260     if self.hvm_disk_type is not None:
5261       instance.hvm_disk_type = self.hvm_disk_type
5262       result.append(("hvm_disk_type", self.hvm_disk_type))
5263     if self.hvm_cdrom_image_path:
5264       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5265         instance.hvm_cdrom_image_path = None
5266       else:
5267         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5268       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5269     if self.vnc_bind_address:
5270       instance.vnc_bind_address = self.vnc_bind_address
5271       result.append(("vnc_bind_address", self.vnc_bind_address))
5272     if self.auto_balance is not None:
5273       instance.auto_balance = self.auto_balance
5274       result.append(("auto_balance", self.auto_balance))
5275
5276     self.cfg.AddInstance(instance)
5277
5278     return result
5279
5280
5281 class LUQueryExports(NoHooksLU):
5282   """Query the exports list
5283
5284   """
5285   _OP_REQP = []
5286
5287   def CheckPrereq(self):
5288     """Check that the nodelist contains only existing nodes.
5289
5290     """
5291     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5292
5293   def Exec(self, feedback_fn):
5294     """Compute the list of all the exported system images.
5295
5296     Returns:
5297       a dictionary with the structure node->(export-list)
5298       where export-list is a list of the instances exported on
5299       that node.
5300
5301     """
5302     return rpc.call_export_list(self.nodes)
5303
5304
5305 class LUExportInstance(LogicalUnit):
5306   """Export an instance to an image in the cluster.
5307
5308   """
5309   HPATH = "instance-export"
5310   HTYPE = constants.HTYPE_INSTANCE
5311   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5312
5313   def BuildHooksEnv(self):
5314     """Build hooks env.
5315
5316     This will run on the master, primary node and target node.
5317
5318     """
5319     env = {
5320       "EXPORT_NODE": self.op.target_node,
5321       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5322       }
5323     env.update(_BuildInstanceHookEnvByObject(self.instance))
5324     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5325           self.op.target_node]
5326     return env, nl, nl
5327
5328   def CheckPrereq(self):
5329     """Check prerequisites.
5330
5331     This checks that the instance and node names are valid.
5332
5333     """
5334     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5335     self.instance = self.cfg.GetInstanceInfo(instance_name)
5336     if self.instance is None:
5337       raise errors.OpPrereqError("Instance '%s' not found" %
5338                                  self.op.instance_name)
5339
5340     # node verification
5341     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5342     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5343
5344     if self.dst_node is None:
5345       raise errors.OpPrereqError("Destination node '%s' is unknown." %
5346                                  self.op.target_node)
5347     self.op.target_node = self.dst_node.name
5348
5349   def Exec(self, feedback_fn):
5350     """Export an instance to an image in the cluster.
5351
5352     """
5353     instance = self.instance
5354     dst_node = self.dst_node
5355     src_node = instance.primary_node
5356     if self.op.shutdown:
5357       # shutdown the instance, but not the disks
5358       if not rpc.call_instance_shutdown(src_node, instance):
5359         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5360                                  (instance.name, src_node))
5361
5362     vgname = self.cfg.GetVGName()
5363
5364     snap_disks = []
5365
5366     try:
5367       for disk in instance.disks:
5368         if disk.iv_name == "sda":
5369           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5370           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5371
5372           if not new_dev_name:
5373             logger.Error("could not snapshot block device %s on node %s" %
5374                          (disk.logical_id[1], src_node))
5375           else:
5376             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5377                                       logical_id=(vgname, new_dev_name),
5378                                       physical_id=(vgname, new_dev_name),
5379                                       iv_name=disk.iv_name)
5380             snap_disks.append(new_dev)
5381
5382     finally:
5383       if self.op.shutdown and instance.status == "up":
5384         if not rpc.call_instance_start(src_node, instance, None):
5385           _ShutdownInstanceDisks(instance, self.cfg)
5386           raise errors.OpExecError("Could not start instance")
5387
5388     # TODO: check for size
5389
5390     for dev in snap_disks:
5391       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5392                                            instance):
5393         logger.Error("could not export block device %s from node"
5394                      " %s to node %s" %
5395                      (dev.logical_id[1], src_node, dst_node.name))
5396       if not rpc.call_blockdev_remove(src_node, dev):
5397         logger.Error("could not remove snapshot block device %s from"
5398                      " node %s" % (dev.logical_id[1], src_node))
5399
5400     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5401       logger.Error("could not finalize export for instance %s on node %s" %
5402                    (instance.name, dst_node.name))
5403
5404     nodelist = self.cfg.GetNodeList()
5405     nodelist.remove(dst_node.name)
5406
5407     # on one-node clusters nodelist will be empty after the removal
5408     # if we proceed the backup would be removed because OpQueryExports
5409     # substitutes an empty list with the full cluster node list.
5410     if nodelist:
5411       op = opcodes.OpQueryExports(nodes=nodelist)
5412       exportlist = self.proc.ChainOpCode(op)
5413       for node in exportlist:
5414         if instance.name in exportlist[node]:
5415           if not rpc.call_export_remove(node, instance.name):
5416             logger.Error("could not remove older export for instance %s"
5417                          " on node %s" % (instance.name, node))
5418
5419
5420 class LURemoveExport(NoHooksLU):
5421   """Remove exports related to the named instance.
5422
5423   """
5424   _OP_REQP = ["instance_name"]
5425
5426   def CheckPrereq(self):
5427     """Check prerequisites.
5428     """
5429     pass
5430
5431   def Exec(self, feedback_fn):
5432     """Remove any export.
5433
5434     """
5435     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5436     # If the instance was not found we'll try with the name that was passed in.
5437     # This will only work if it was an FQDN, though.
5438     fqdn_warn = False
5439     if not instance_name:
5440       fqdn_warn = True
5441       instance_name = self.op.instance_name
5442
5443     op = opcodes.OpQueryExports(nodes=[])
5444     exportlist = self.proc.ChainOpCode(op)
5445     found = False
5446     for node in exportlist:
5447       if instance_name in exportlist[node]:
5448         found = True
5449         if not rpc.call_export_remove(node, instance_name):
5450           logger.Error("could not remove export for instance %s"
5451                        " on node %s" % (instance_name, node))
5452
5453     if fqdn_warn and not found:
5454       feedback_fn("Export not found. If trying to remove an export belonging"
5455                   " to a deleted instance please use its Fully Qualified"
5456                   " Domain Name.")
5457
5458
5459 class TagsLU(NoHooksLU):
5460   """Generic tags LU.
5461
5462   This is an abstract class which is the parent of all the other tags LUs.
5463
5464   """
5465   def CheckPrereq(self):
5466     """Check prerequisites.
5467
5468     """
5469     if self.op.kind == constants.TAG_CLUSTER:
5470       self.target = self.cfg.GetClusterInfo()
5471     elif self.op.kind == constants.TAG_NODE:
5472       name = self.cfg.ExpandNodeName(self.op.name)
5473       if name is None:
5474         raise errors.OpPrereqError("Invalid node name (%s)" %
5475                                    (self.op.name,))
5476       self.op.name = name
5477       self.target = self.cfg.GetNodeInfo(name)
5478     elif self.op.kind == constants.TAG_INSTANCE:
5479       name = self.cfg.ExpandInstanceName(self.op.name)
5480       if name is None:
5481         raise errors.OpPrereqError("Invalid instance name (%s)" %
5482                                    (self.op.name,))
5483       self.op.name = name
5484       self.target = self.cfg.GetInstanceInfo(name)
5485     else:
5486       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5487                                  str(self.op.kind))
5488
5489
5490 class LUGetTags(TagsLU):
5491   """Returns the tags of a given object.
5492
5493   """
5494   _OP_REQP = ["kind", "name"]
5495
5496   def Exec(self, feedback_fn):
5497     """Returns the tag list.
5498
5499     """
5500     return self.target.GetTags()
5501
5502
5503 class LUSearchTags(NoHooksLU):
5504   """Searches the tags for a given pattern.
5505
5506   """
5507   _OP_REQP = ["pattern"]
5508
5509   def CheckPrereq(self):
5510     """Check prerequisites.
5511
5512     This checks the pattern passed for validity by compiling it.
5513
5514     """
5515     try:
5516       self.re = re.compile(self.op.pattern)
5517     except re.error, err:
5518       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5519                                  (self.op.pattern, err))
5520
5521   def Exec(self, feedback_fn):
5522     """Returns the tag list.
5523
5524     """
5525     cfg = self.cfg
5526     tgts = [("/cluster", cfg.GetClusterInfo())]
5527     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5528     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5529     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5530     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5531     results = []
5532     for path, target in tgts:
5533       for tag in target.GetTags():
5534         if self.re.search(tag):
5535           results.append((path, tag))
5536     return results
5537
5538
5539 class LUAddTags(TagsLU):
5540   """Sets a tag on a given object.
5541
5542   """
5543   _OP_REQP = ["kind", "name", "tags"]
5544
5545   def CheckPrereq(self):
5546     """Check prerequisites.
5547
5548     This checks the type and length of the tag name and value.
5549
5550     """
5551     TagsLU.CheckPrereq(self)
5552     for tag in self.op.tags:
5553       objects.TaggableObject.ValidateTag(tag)
5554
5555   def Exec(self, feedback_fn):
5556     """Sets the tag.
5557
5558     """
5559     try:
5560       for tag in self.op.tags:
5561         self.target.AddTag(tag)
5562     except errors.TagError, err:
5563       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5564     try:
5565       self.cfg.Update(self.target)
5566     except errors.ConfigurationError:
5567       raise errors.OpRetryError("There has been a modification to the"
5568                                 " config file and the operation has been"
5569                                 " aborted. Please retry.")
5570
5571
5572 class LUDelTags(TagsLU):
5573   """Delete a list of tags from a given object.
5574
5575   """
5576   _OP_REQP = ["kind", "name", "tags"]
5577
5578   def CheckPrereq(self):
5579     """Check prerequisites.
5580
5581     This checks that we have the given tag.
5582
5583     """
5584     TagsLU.CheckPrereq(self)
5585     for tag in self.op.tags:
5586       objects.TaggableObject.ValidateTag(tag, removal=True)
5587     del_tags = frozenset(self.op.tags)
5588     cur_tags = self.target.GetTags()
5589     if not del_tags <= cur_tags:
5590       diff_tags = del_tags - cur_tags
5591       diff_names = ["'%s'" % tag for tag in diff_tags]
5592       diff_names.sort()
5593       raise errors.OpPrereqError("Tag(s) %s not found" %
5594                                  (",".join(diff_names)))
5595
5596   def Exec(self, feedback_fn):
5597     """Remove the tag from the object.
5598
5599     """
5600     for tag in self.op.tags:
5601       self.target.RemoveTag(tag)
5602     try:
5603       self.cfg.Update(self.target)
5604     except errors.ConfigurationError:
5605       raise errors.OpRetryError("There has been a modification to the"
5606                                 " config file and the operation has been"
5607                                 " aborted. Please retry.")
5608
5609 class LUTestDelay(NoHooksLU):
5610   """Sleep for a specified amount of time.
5611
5612   This LU sleeps on the master and/or nodes for a specified amoutn of
5613   time.
5614
5615   """
5616   _OP_REQP = ["duration", "on_master", "on_nodes"]
5617
5618   def CheckPrereq(self):
5619     """Check prerequisites.
5620
5621     This checks that we have a good list of nodes and/or the duration
5622     is valid.
5623
5624     """
5625
5626     if self.op.on_nodes:
5627       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5628
5629   def Exec(self, feedback_fn):
5630     """Do the actual sleep.
5631
5632     """
5633     if self.op.on_master:
5634       if not utils.TestDelay(self.op.duration):
5635         raise errors.OpExecError("Error during master delay test")
5636     if self.op.on_nodes:
5637       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5638       if not result:
5639         raise errors.OpExecError("Complete failure from rpc call")
5640       for node, node_result in result.items():
5641         if not node_result:
5642           raise errors.OpExecError("Failure during rpc call to node %s,"
5643                                    " result: %s" % (node, node_result))
5644
5645
5646 class IAllocator(object):
5647   """IAllocator framework.
5648
5649   An IAllocator instance has three sets of attributes:
5650     - cfg/sstore that are needed to query the cluster
5651     - input data (all members of the _KEYS class attribute are required)
5652     - four buffer attributes (in|out_data|text), that represent the
5653       input (to the external script) in text and data structure format,
5654       and the output from it, again in two formats
5655     - the result variables from the script (success, info, nodes) for
5656       easy usage
5657
5658   """
5659   _ALLO_KEYS = [
5660     "mem_size", "disks", "disk_template",
5661     "os", "tags", "nics", "vcpus",
5662     ]
5663   _RELO_KEYS = [
5664     "relocate_from",
5665     ]
5666
5667   def __init__(self, cfg, sstore, mode, name, **kwargs):
5668     self.cfg = cfg
5669     self.sstore = sstore
5670     # init buffer variables
5671     self.in_text = self.out_text = self.in_data = self.out_data = None
5672     # init all input fields so that pylint is happy
5673     self.mode = mode
5674     self.name = name
5675     self.mem_size = self.disks = self.disk_template = None
5676     self.os = self.tags = self.nics = self.vcpus = None
5677     self.relocate_from = None
5678     # computed fields
5679     self.required_nodes = None
5680     # init result fields
5681     self.success = self.info = self.nodes = None
5682     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5683       keyset = self._ALLO_KEYS
5684     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5685       keyset = self._RELO_KEYS
5686     else:
5687       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5688                                    " IAllocator" % self.mode)
5689     for key in kwargs:
5690       if key not in keyset:
5691         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5692                                      " IAllocator" % key)
5693       setattr(self, key, kwargs[key])
5694     for key in keyset:
5695       if key not in kwargs:
5696         raise errors.ProgrammerError("Missing input parameter '%s' to"
5697                                      " IAllocator" % key)
5698     self._BuildInputData()
5699
5700   def _ComputeClusterData(self):
5701     """Compute the generic allocator input data.
5702
5703     This is the data that is independent of the actual operation.
5704
5705     """
5706     cfg = self.cfg
5707     # cluster data
5708     data = {
5709       "version": 1,
5710       "cluster_name": self.sstore.GetClusterName(),
5711       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5712       "hypervisor_type": self.sstore.GetHypervisorType(),
5713       # we don't have job IDs
5714       }
5715
5716     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5717
5718     # node data
5719     node_results = {}
5720     node_list = cfg.GetNodeList()
5721     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5722     for nname in node_list:
5723       ninfo = cfg.GetNodeInfo(nname)
5724       if nname not in node_data or not isinstance(node_data[nname], dict):
5725         raise errors.OpExecError("Can't get data for node %s" % nname)
5726       remote_info = node_data[nname]
5727       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5728                    'vg_size', 'vg_free', 'cpu_total']:
5729         if attr not in remote_info:
5730           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5731                                    (nname, attr))
5732         try:
5733           remote_info[attr] = int(remote_info[attr])
5734         except ValueError, err:
5735           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5736                                    " %s" % (nname, attr, str(err)))
5737       # compute memory used by primary instances
5738       i_p_mem = i_p_up_mem = 0
5739       for iinfo in i_list:
5740         if iinfo.primary_node == nname:
5741           i_p_mem += iinfo.memory
5742           if iinfo.status == "up":
5743             i_p_up_mem += iinfo.memory
5744
5745       # compute memory used by instances
5746       pnr = {
5747         "tags": list(ninfo.GetTags()),
5748         "total_memory": remote_info['memory_total'],
5749         "reserved_memory": remote_info['memory_dom0'],
5750         "free_memory": remote_info['memory_free'],
5751         "i_pri_memory": i_p_mem,
5752         "i_pri_up_memory": i_p_up_mem,
5753         "total_disk": remote_info['vg_size'],
5754         "free_disk": remote_info['vg_free'],
5755         "primary_ip": ninfo.primary_ip,
5756         "secondary_ip": ninfo.secondary_ip,
5757         "total_cpus": remote_info['cpu_total'],
5758         }
5759       node_results[nname] = pnr
5760     data["nodes"] = node_results
5761
5762     # instance data
5763     instance_data = {}
5764     for iinfo in i_list:
5765       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5766                   for n in iinfo.nics]
5767       pir = {
5768         "tags": list(iinfo.GetTags()),
5769         "should_run": iinfo.status == "up",
5770         "vcpus": iinfo.vcpus,
5771         "memory": iinfo.memory,
5772         "os": iinfo.os,
5773         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5774         "nics": nic_data,
5775         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5776         "disk_template": iinfo.disk_template,
5777         }
5778       instance_data[iinfo.name] = pir
5779
5780     data["instances"] = instance_data
5781
5782     self.in_data = data
5783
5784   def _AddNewInstance(self):
5785     """Add new instance data to allocator structure.
5786
5787     This in combination with _AllocatorGetClusterData will create the
5788     correct structure needed as input for the allocator.
5789
5790     The checks for the completeness of the opcode must have already been
5791     done.
5792
5793     """
5794     data = self.in_data
5795     if len(self.disks) != 2:
5796       raise errors.OpExecError("Only two-disk configurations supported")
5797
5798     disk_space = _ComputeDiskSize(self.disk_template,
5799                                   self.disks[0]["size"], self.disks[1]["size"])
5800
5801     if self.disk_template in constants.DTS_NET_MIRROR:
5802       self.required_nodes = 2
5803     else:
5804       self.required_nodes = 1
5805     request = {
5806       "type": "allocate",
5807       "name": self.name,
5808       "disk_template": self.disk_template,
5809       "tags": self.tags,
5810       "os": self.os,
5811       "vcpus": self.vcpus,
5812       "memory": self.mem_size,
5813       "disks": self.disks,
5814       "disk_space_total": disk_space,
5815       "nics": self.nics,
5816       "required_nodes": self.required_nodes,
5817       }
5818     data["request"] = request
5819
5820   def _AddRelocateInstance(self):
5821     """Add relocate instance data to allocator structure.
5822
5823     This in combination with _IAllocatorGetClusterData will create the
5824     correct structure needed as input for the allocator.
5825
5826     The checks for the completeness of the opcode must have already been
5827     done.
5828
5829     """
5830     instance = self.cfg.GetInstanceInfo(self.name)
5831     if instance is None:
5832       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5833                                    " IAllocator" % self.name)
5834
5835     if instance.disk_template not in constants.DTS_NET_MIRROR:
5836       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5837
5838     if len(instance.secondary_nodes) != 1:
5839       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5840
5841     self.required_nodes = 1
5842
5843     disk_space = _ComputeDiskSize(instance.disk_template,
5844                                   instance.disks[0].size,
5845                                   instance.disks[1].size)
5846
5847     request = {
5848       "type": "relocate",
5849       "name": self.name,
5850       "disk_space_total": disk_space,
5851       "required_nodes": self.required_nodes,
5852       "relocate_from": self.relocate_from,
5853       }
5854     self.in_data["request"] = request
5855
5856   def _BuildInputData(self):
5857     """Build input data structures.
5858
5859     """
5860     self._ComputeClusterData()
5861
5862     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5863       self._AddNewInstance()
5864     else:
5865       self._AddRelocateInstance()
5866
5867     self.in_text = serializer.Dump(self.in_data)
5868
5869   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5870     """Run an instance allocator and return the results.
5871
5872     """
5873     data = self.in_text
5874
5875     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5876
5877     if not isinstance(result, tuple) or len(result) != 4:
5878       raise errors.OpExecError("Invalid result from master iallocator runner")
5879
5880     rcode, stdout, stderr, fail = result
5881
5882     if rcode == constants.IARUN_NOTFOUND:
5883       raise errors.OpExecError("Can't find allocator '%s'" % name)
5884     elif rcode == constants.IARUN_FAILURE:
5885         raise errors.OpExecError("Instance allocator call failed: %s,"
5886                                  " output: %s" %
5887                                  (fail, stdout+stderr))
5888     self.out_text = stdout
5889     if validate:
5890       self._ValidateResult()
5891
5892   def _ValidateResult(self):
5893     """Process the allocator results.
5894
5895     This will process and if successful save the result in
5896     self.out_data and the other parameters.
5897
5898     """
5899     try:
5900       rdict = serializer.Load(self.out_text)
5901     except Exception, err:
5902       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5903
5904     if not isinstance(rdict, dict):
5905       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5906
5907     for key in "success", "info", "nodes":
5908       if key not in rdict:
5909         raise errors.OpExecError("Can't parse iallocator results:"
5910                                  " missing key '%s'" % key)
5911       setattr(self, key, rdict[key])
5912
5913     if not isinstance(rdict["nodes"], list):
5914       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5915                                " is not a list")
5916     self.out_data = rdict
5917
5918
5919 class LUTestAllocator(NoHooksLU):
5920   """Run allocator tests.
5921
5922   This LU runs the allocator tests
5923
5924   """
5925   _OP_REQP = ["direction", "mode", "name"]
5926
5927   def CheckPrereq(self):
5928     """Check prerequisites.
5929
5930     This checks the opcode parameters depending on the director and mode test.
5931
5932     """
5933     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5934       for attr in ["name", "mem_size", "disks", "disk_template",
5935                    "os", "tags", "nics", "vcpus"]:
5936         if not hasattr(self.op, attr):
5937           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5938                                      attr)
5939       iname = self.cfg.ExpandInstanceName(self.op.name)
5940       if iname is not None:
5941         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5942                                    iname)
5943       if not isinstance(self.op.nics, list):
5944         raise errors.OpPrereqError("Invalid parameter 'nics'")
5945       for row in self.op.nics:
5946         if (not isinstance(row, dict) or
5947             "mac" not in row or
5948             "ip" not in row or
5949             "bridge" not in row):
5950           raise errors.OpPrereqError("Invalid contents of the"
5951                                      " 'nics' parameter")
5952       if not isinstance(self.op.disks, list):
5953         raise errors.OpPrereqError("Invalid parameter 'disks'")
5954       if len(self.op.disks) != 2:
5955         raise errors.OpPrereqError("Only two-disk configurations supported")
5956       for row in self.op.disks:
5957         if (not isinstance(row, dict) or
5958             "size" not in row or
5959             not isinstance(row["size"], int) or
5960             "mode" not in row or
5961             row["mode"] not in ['r', 'w']):
5962           raise errors.OpPrereqError("Invalid contents of the"
5963                                      " 'disks' parameter")
5964     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5965       if not hasattr(self.op, "name"):
5966         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5967       fname = self.cfg.ExpandInstanceName(self.op.name)
5968       if fname is None:
5969         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5970                                    self.op.name)
5971       self.op.name = fname
5972       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5973     else:
5974       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5975                                  self.op.mode)
5976
5977     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5978       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5979         raise errors.OpPrereqError("Missing allocator name")
5980     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5981       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5982                                  self.op.direction)
5983
5984   def Exec(self, feedback_fn):
5985     """Run the allocator test.
5986
5987     """
5988     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5989       ial = IAllocator(self.cfg, self.sstore,
5990                        mode=self.op.mode,
5991                        name=self.op.name,
5992                        mem_size=self.op.mem_size,
5993                        disks=self.op.disks,
5994                        disk_template=self.op.disk_template,
5995                        os=self.op.os,
5996                        tags=self.op.tags,
5997                        nics=self.op.nics,
5998                        vcpus=self.op.vcpus,
5999                        )
6000     else:
6001       ial = IAllocator(self.cfg, self.sstore,
6002                        mode=self.op.mode,
6003                        name=self.op.name,
6004                        relocate_from=list(self.relocate_from),
6005                        )
6006
6007     if self.op.direction == constants.IALLOCATOR_DIR_IN:
6008       result = ial.in_text
6009     else:
6010       ial.Run(self.op.allocator, validate=False)
6011       result = ial.out_text
6012     return result