Implement node daemon conectivity tests
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     # TODO: populate the environment with useful information for verify hooks
862     env = {}
863     return env, [], all_nodes
864
865   def Exec(self, feedback_fn):
866     """Verify integrity of cluster, performing various test on nodes.
867
868     """
869     bad = False
870     feedback_fn("* Verifying global settings")
871     for msg in self.cfg.VerifyConfig():
872       feedback_fn("  - ERROR: %s" % msg)
873
874     vg_name = self.cfg.GetVGName()
875     nodelist = utils.NiceSort(self.cfg.GetNodeList())
876     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
877     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
878     i_non_redundant = [] # Non redundant instances
879     node_volume = {}
880     node_instance = {}
881     node_info = {}
882     instance_cfg = {}
883
884     # FIXME: verify OS list
885     # do local checksums
886     file_names = list(self.sstore.GetFileList())
887     file_names.append(constants.SSL_CERT_FILE)
888     file_names.append(constants.CLUSTER_CONF_FILE)
889     local_checksums = utils.FingerprintFiles(file_names)
890
891     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
892     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
893     all_instanceinfo = rpc.call_instance_list(nodelist)
894     all_vglist = rpc.call_vg_list(nodelist)
895     node_verify_param = {
896       'filelist': file_names,
897       'nodelist': nodelist,
898       'hypervisor': None,
899       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
900                         for node in nodeinfo]
901       }
902     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
903     all_rversion = rpc.call_version(nodelist)
904     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
905
906     for node in nodelist:
907       feedback_fn("* Verifying node %s" % node)
908       result = self._VerifyNode(node, file_names, local_checksums,
909                                 all_vglist[node], all_nvinfo[node],
910                                 all_rversion[node], feedback_fn)
911       bad = bad or result
912
913       # node_volume
914       volumeinfo = all_volumeinfo[node]
915
916       if isinstance(volumeinfo, basestring):
917         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
918                     (node, volumeinfo[-400:].encode('string_escape')))
919         bad = True
920         node_volume[node] = {}
921       elif not isinstance(volumeinfo, dict):
922         feedback_fn("  - ERROR: connection to %s failed" % (node,))
923         bad = True
924         continue
925       else:
926         node_volume[node] = volumeinfo
927
928       # node_instance
929       nodeinstance = all_instanceinfo[node]
930       if type(nodeinstance) != list:
931         feedback_fn("  - ERROR: connection to %s failed" % (node,))
932         bad = True
933         continue
934
935       node_instance[node] = nodeinstance
936
937       # node_info
938       nodeinfo = all_ninfo[node]
939       if not isinstance(nodeinfo, dict):
940         feedback_fn("  - ERROR: connection to %s failed" % (node,))
941         bad = True
942         continue
943
944       try:
945         node_info[node] = {
946           "mfree": int(nodeinfo['memory_free']),
947           "dfree": int(nodeinfo['vg_free']),
948           "pinst": [],
949           "sinst": [],
950           # dictionary holding all instances this node is secondary for,
951           # grouped by their primary node. Each key is a cluster node, and each
952           # value is a list of instances which have the key as primary and the
953           # current node as secondary.  this is handy to calculate N+1 memory
954           # availability if you can only failover from a primary to its
955           # secondary.
956           "sinst-by-pnode": {},
957         }
958       except ValueError:
959         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
960         bad = True
961         continue
962
963     node_vol_should = {}
964
965     for instance in instancelist:
966       feedback_fn("* Verifying instance %s" % instance)
967       inst_config = self.cfg.GetInstanceInfo(instance)
968       result =  self._VerifyInstance(instance, inst_config, node_volume,
969                                      node_instance, feedback_fn)
970       bad = bad or result
971
972       inst_config.MapLVsByNode(node_vol_should)
973
974       instance_cfg[instance] = inst_config
975
976       pnode = inst_config.primary_node
977       if pnode in node_info:
978         node_info[pnode]['pinst'].append(instance)
979       else:
980         feedback_fn("  - ERROR: instance %s, connection to primary node"
981                     " %s failed" % (instance, pnode))
982         bad = True
983
984       # If the instance is non-redundant we cannot survive losing its primary
985       # node, so we are not N+1 compliant. On the other hand we have no disk
986       # templates with more than one secondary so that situation is not well
987       # supported either.
988       # FIXME: does not support file-backed instances
989       if len(inst_config.secondary_nodes) == 0:
990         i_non_redundant.append(instance)
991       elif len(inst_config.secondary_nodes) > 1:
992         feedback_fn("  - WARNING: multiple secondaries for instance %s"
993                     % instance)
994
995       for snode in inst_config.secondary_nodes:
996         if snode in node_info:
997           node_info[snode]['sinst'].append(instance)
998           if pnode not in node_info[snode]['sinst-by-pnode']:
999             node_info[snode]['sinst-by-pnode'][pnode] = []
1000           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1001         else:
1002           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1003                       " %s failed" % (instance, snode))
1004
1005     feedback_fn("* Verifying orphan volumes")
1006     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1007                                        feedback_fn)
1008     bad = bad or result
1009
1010     feedback_fn("* Verifying remaining instances")
1011     result = self._VerifyOrphanInstances(instancelist, node_instance,
1012                                          feedback_fn)
1013     bad = bad or result
1014
1015     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1016       feedback_fn("* Verifying N+1 Memory redundancy")
1017       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1018       bad = bad or result
1019
1020     feedback_fn("* Other Notes")
1021     if i_non_redundant:
1022       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1023                   % len(i_non_redundant))
1024
1025     return int(bad)
1026
1027   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1028     """Analize the post-hooks' result, handle it, and send some
1029     nicely-formatted feedback back to the user.
1030
1031     Args:
1032       phase: the hooks phase that has just been run
1033       hooks_results: the results of the multi-node hooks rpc call
1034       feedback_fn: function to send feedback back to the caller
1035       lu_result: previous Exec result
1036
1037     """
1038     # We only really run POST phase hooks, and are only interested in their results
1039     if phase == constants.HOOKS_PHASE_POST:
1040       # Used to change hooks' output to proper indentation
1041       indent_re = re.compile('^', re.M)
1042       feedback_fn("* Hooks Results")
1043       if not hooks_results:
1044         feedback_fn("  - ERROR: general communication failure")
1045         lu_result = 1
1046       else:
1047         for node_name in hooks_results:
1048           show_node_header = True
1049           res = hooks_results[node_name]
1050           if res is False or not isinstance(res, list):
1051             feedback_fn("    Communication failure")
1052             lu_result = 1
1053             continue
1054           for script, hkr, output in res:
1055             if hkr == constants.HKR_FAIL:
1056               # The node header is only shown once, if there are
1057               # failing hooks on that node
1058               if show_node_header:
1059                 feedback_fn("  Node %s:" % node_name)
1060                 show_node_header = False
1061               feedback_fn("    ERROR: Script %s failed, output:" % script)
1062               output = indent_re.sub('      ', output)
1063               feedback_fn("%s" % output)
1064               lu_result = 1
1065
1066       return lu_result
1067
1068
1069 class LUVerifyDisks(NoHooksLU):
1070   """Verifies the cluster disks status.
1071
1072   """
1073   _OP_REQP = []
1074
1075   def CheckPrereq(self):
1076     """Check prerequisites.
1077
1078     This has no prerequisites.
1079
1080     """
1081     pass
1082
1083   def Exec(self, feedback_fn):
1084     """Verify integrity of cluster disks.
1085
1086     """
1087     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1088
1089     vg_name = self.cfg.GetVGName()
1090     nodes = utils.NiceSort(self.cfg.GetNodeList())
1091     instances = [self.cfg.GetInstanceInfo(name)
1092                  for name in self.cfg.GetInstanceList()]
1093
1094     nv_dict = {}
1095     for inst in instances:
1096       inst_lvs = {}
1097       if (inst.status != "up" or
1098           inst.disk_template not in constants.DTS_NET_MIRROR):
1099         continue
1100       inst.MapLVsByNode(inst_lvs)
1101       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1102       for node, vol_list in inst_lvs.iteritems():
1103         for vol in vol_list:
1104           nv_dict[(node, vol)] = inst
1105
1106     if not nv_dict:
1107       return result
1108
1109     node_lvs = rpc.call_volume_list(nodes, vg_name)
1110
1111     to_act = set()
1112     for node in nodes:
1113       # node_volume
1114       lvs = node_lvs[node]
1115
1116       if isinstance(lvs, basestring):
1117         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1118         res_nlvm[node] = lvs
1119       elif not isinstance(lvs, dict):
1120         logger.Info("connection to node %s failed or invalid data returned" %
1121                     (node,))
1122         res_nodes.append(node)
1123         continue
1124
1125       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1126         inst = nv_dict.pop((node, lv_name), None)
1127         if (not lv_online and inst is not None
1128             and inst.name not in res_instances):
1129           res_instances.append(inst.name)
1130
1131     # any leftover items in nv_dict are missing LVs, let's arrange the
1132     # data better
1133     for key, inst in nv_dict.iteritems():
1134       if inst.name not in res_missing:
1135         res_missing[inst.name] = []
1136       res_missing[inst.name].append(key)
1137
1138     return result
1139
1140
1141 class LURenameCluster(LogicalUnit):
1142   """Rename the cluster.
1143
1144   """
1145   HPATH = "cluster-rename"
1146   HTYPE = constants.HTYPE_CLUSTER
1147   _OP_REQP = ["name"]
1148
1149   def BuildHooksEnv(self):
1150     """Build hooks env.
1151
1152     """
1153     env = {
1154       "OP_TARGET": self.sstore.GetClusterName(),
1155       "NEW_NAME": self.op.name,
1156       }
1157     mn = self.sstore.GetMasterNode()
1158     return env, [mn], [mn]
1159
1160   def CheckPrereq(self):
1161     """Verify that the passed name is a valid one.
1162
1163     """
1164     hostname = utils.HostInfo(self.op.name)
1165
1166     new_name = hostname.name
1167     self.ip = new_ip = hostname.ip
1168     old_name = self.sstore.GetClusterName()
1169     old_ip = self.sstore.GetMasterIP()
1170     if new_name == old_name and new_ip == old_ip:
1171       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1172                                  " cluster has changed")
1173     if new_ip != old_ip:
1174       result = utils.RunCmd(["fping", "-q", new_ip])
1175       if not result.failed:
1176         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1177                                    " reachable on the network. Aborting." %
1178                                    new_ip)
1179
1180     self.op.name = new_name
1181
1182   def Exec(self, feedback_fn):
1183     """Rename the cluster.
1184
1185     """
1186     clustername = self.op.name
1187     ip = self.ip
1188     ss = self.sstore
1189
1190     # shutdown the master IP
1191     master = ss.GetMasterNode()
1192     if not rpc.call_node_stop_master(master):
1193       raise errors.OpExecError("Could not disable the master role")
1194
1195     try:
1196       # modify the sstore
1197       ss.SetKey(ss.SS_MASTER_IP, ip)
1198       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1199
1200       # Distribute updated ss config to all nodes
1201       myself = self.cfg.GetNodeInfo(master)
1202       dist_nodes = self.cfg.GetNodeList()
1203       if myself.name in dist_nodes:
1204         dist_nodes.remove(myself.name)
1205
1206       logger.Debug("Copying updated ssconf data to all nodes")
1207       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1208         fname = ss.KeyToFilename(keyname)
1209         result = rpc.call_upload_file(dist_nodes, fname)
1210         for to_node in dist_nodes:
1211           if not result[to_node]:
1212             logger.Error("copy of file %s to node %s failed" %
1213                          (fname, to_node))
1214     finally:
1215       if not rpc.call_node_start_master(master):
1216         logger.Error("Could not re-enable the master role on the master,"
1217                      " please restart manually.")
1218
1219
1220 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1221   """Sleep and poll for an instance's disk to sync.
1222
1223   """
1224   if not instance.disks:
1225     return True
1226
1227   if not oneshot:
1228     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1229
1230   node = instance.primary_node
1231
1232   for dev in instance.disks:
1233     cfgw.SetDiskID(dev, node)
1234
1235   retries = 0
1236   while True:
1237     max_time = 0
1238     done = True
1239     cumul_degraded = False
1240     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1241     if not rstats:
1242       proc.LogWarning("Can't get any data from node %s" % node)
1243       retries += 1
1244       if retries >= 10:
1245         raise errors.RemoteError("Can't contact node %s for mirror data,"
1246                                  " aborting." % node)
1247       time.sleep(6)
1248       continue
1249     retries = 0
1250     for i in range(len(rstats)):
1251       mstat = rstats[i]
1252       if mstat is None:
1253         proc.LogWarning("Can't compute data for node %s/%s" %
1254                         (node, instance.disks[i].iv_name))
1255         continue
1256       # we ignore the ldisk parameter
1257       perc_done, est_time, is_degraded, _ = mstat
1258       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1259       if perc_done is not None:
1260         done = False
1261         if est_time is not None:
1262           rem_time = "%d estimated seconds remaining" % est_time
1263           max_time = est_time
1264         else:
1265           rem_time = "no time estimate"
1266         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1267                      (instance.disks[i].iv_name, perc_done, rem_time))
1268     if done or oneshot:
1269       break
1270
1271     if unlock:
1272       utils.Unlock('cmd')
1273     try:
1274       time.sleep(min(60, max_time))
1275     finally:
1276       if unlock:
1277         utils.Lock('cmd')
1278
1279   if done:
1280     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1281   return not cumul_degraded
1282
1283
1284 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1285   """Check that mirrors are not degraded.
1286
1287   The ldisk parameter, if True, will change the test from the
1288   is_degraded attribute (which represents overall non-ok status for
1289   the device(s)) to the ldisk (representing the local storage status).
1290
1291   """
1292   cfgw.SetDiskID(dev, node)
1293   if ldisk:
1294     idx = 6
1295   else:
1296     idx = 5
1297
1298   result = True
1299   if on_primary or dev.AssembleOnSecondary():
1300     rstats = rpc.call_blockdev_find(node, dev)
1301     if not rstats:
1302       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1303       result = False
1304     else:
1305       result = result and (not rstats[idx])
1306   if dev.children:
1307     for child in dev.children:
1308       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1309
1310   return result
1311
1312
1313 class LUDiagnoseOS(NoHooksLU):
1314   """Logical unit for OS diagnose/query.
1315
1316   """
1317   _OP_REQP = ["output_fields", "names"]
1318
1319   def CheckPrereq(self):
1320     """Check prerequisites.
1321
1322     This always succeeds, since this is a pure query LU.
1323
1324     """
1325     if self.op.names:
1326       raise errors.OpPrereqError("Selective OS query not supported")
1327
1328     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1329     _CheckOutputFields(static=[],
1330                        dynamic=self.dynamic_fields,
1331                        selected=self.op.output_fields)
1332
1333   @staticmethod
1334   def _DiagnoseByOS(node_list, rlist):
1335     """Remaps a per-node return list into an a per-os per-node dictionary
1336
1337       Args:
1338         node_list: a list with the names of all nodes
1339         rlist: a map with node names as keys and OS objects as values
1340
1341       Returns:
1342         map: a map with osnames as keys and as value another map, with
1343              nodes as
1344              keys and list of OS objects as values
1345              e.g. {"debian-etch": {"node1": [<object>,...],
1346                                    "node2": [<object>,]}
1347                   }
1348
1349     """
1350     all_os = {}
1351     for node_name, nr in rlist.iteritems():
1352       if not nr:
1353         continue
1354       for os in nr:
1355         if os.name not in all_os:
1356           # build a list of nodes for this os containing empty lists
1357           # for each node in node_list
1358           all_os[os.name] = {}
1359           for nname in node_list:
1360             all_os[os.name][nname] = []
1361         all_os[os.name][node_name].append(os)
1362     return all_os
1363
1364   def Exec(self, feedback_fn):
1365     """Compute the list of OSes.
1366
1367     """
1368     node_list = self.cfg.GetNodeList()
1369     node_data = rpc.call_os_diagnose(node_list)
1370     if node_data == False:
1371       raise errors.OpExecError("Can't gather the list of OSes")
1372     pol = self._DiagnoseByOS(node_list, node_data)
1373     output = []
1374     for os_name, os_data in pol.iteritems():
1375       row = []
1376       for field in self.op.output_fields:
1377         if field == "name":
1378           val = os_name
1379         elif field == "valid":
1380           val = utils.all([osl and osl[0] for osl in os_data.values()])
1381         elif field == "node_status":
1382           val = {}
1383           for node_name, nos_list in os_data.iteritems():
1384             val[node_name] = [(v.status, v.path) for v in nos_list]
1385         else:
1386           raise errors.ParameterError(field)
1387         row.append(val)
1388       output.append(row)
1389
1390     return output
1391
1392
1393 class LURemoveNode(LogicalUnit):
1394   """Logical unit for removing a node.
1395
1396   """
1397   HPATH = "node-remove"
1398   HTYPE = constants.HTYPE_NODE
1399   _OP_REQP = ["node_name"]
1400
1401   def BuildHooksEnv(self):
1402     """Build hooks env.
1403
1404     This doesn't run on the target node in the pre phase as a failed
1405     node would not allows itself to run.
1406
1407     """
1408     env = {
1409       "OP_TARGET": self.op.node_name,
1410       "NODE_NAME": self.op.node_name,
1411       }
1412     all_nodes = self.cfg.GetNodeList()
1413     all_nodes.remove(self.op.node_name)
1414     return env, all_nodes, all_nodes
1415
1416   def CheckPrereq(self):
1417     """Check prerequisites.
1418
1419     This checks:
1420      - the node exists in the configuration
1421      - it does not have primary or secondary instances
1422      - it's not the master
1423
1424     Any errors are signalled by raising errors.OpPrereqError.
1425
1426     """
1427     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1428     if node is None:
1429       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1430
1431     instance_list = self.cfg.GetInstanceList()
1432
1433     masternode = self.sstore.GetMasterNode()
1434     if node.name == masternode:
1435       raise errors.OpPrereqError("Node is the master node,"
1436                                  " you need to failover first.")
1437
1438     for instance_name in instance_list:
1439       instance = self.cfg.GetInstanceInfo(instance_name)
1440       if node.name == instance.primary_node:
1441         raise errors.OpPrereqError("Instance %s still running on the node,"
1442                                    " please remove first." % instance_name)
1443       if node.name in instance.secondary_nodes:
1444         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1445                                    " please remove first." % instance_name)
1446     self.op.node_name = node.name
1447     self.node = node
1448
1449   def Exec(self, feedback_fn):
1450     """Removes the node from the cluster.
1451
1452     """
1453     node = self.node
1454     logger.Info("stopping the node daemon and removing configs from node %s" %
1455                 node.name)
1456
1457     rpc.call_node_leave_cluster(node.name)
1458
1459     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1460
1461     logger.Info("Removing node %s from config" % node.name)
1462
1463     self.cfg.RemoveNode(node.name)
1464
1465     _RemoveHostFromEtcHosts(node.name)
1466
1467
1468 class LUQueryNodes(NoHooksLU):
1469   """Logical unit for querying nodes.
1470
1471   """
1472   _OP_REQP = ["output_fields", "names"]
1473
1474   def CheckPrereq(self):
1475     """Check prerequisites.
1476
1477     This checks that the fields required are valid output fields.
1478
1479     """
1480     self.dynamic_fields = frozenset([
1481       "dtotal", "dfree",
1482       "mtotal", "mnode", "mfree",
1483       "bootid",
1484       "ctotal",
1485       ])
1486
1487     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1488                                "pinst_list", "sinst_list",
1489                                "pip", "sip"],
1490                        dynamic=self.dynamic_fields,
1491                        selected=self.op.output_fields)
1492
1493     self.wanted = _GetWantedNodes(self, self.op.names)
1494
1495   def Exec(self, feedback_fn):
1496     """Computes the list of nodes and their attributes.
1497
1498     """
1499     nodenames = self.wanted
1500     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1501
1502     # begin data gathering
1503
1504     if self.dynamic_fields.intersection(self.op.output_fields):
1505       live_data = {}
1506       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1507       for name in nodenames:
1508         nodeinfo = node_data.get(name, None)
1509         if nodeinfo:
1510           live_data[name] = {
1511             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1512             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1513             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1514             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1515             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1516             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1517             "bootid": nodeinfo['bootid'],
1518             }
1519         else:
1520           live_data[name] = {}
1521     else:
1522       live_data = dict.fromkeys(nodenames, {})
1523
1524     node_to_primary = dict([(name, set()) for name in nodenames])
1525     node_to_secondary = dict([(name, set()) for name in nodenames])
1526
1527     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1528                              "sinst_cnt", "sinst_list"))
1529     if inst_fields & frozenset(self.op.output_fields):
1530       instancelist = self.cfg.GetInstanceList()
1531
1532       for instance_name in instancelist:
1533         inst = self.cfg.GetInstanceInfo(instance_name)
1534         if inst.primary_node in node_to_primary:
1535           node_to_primary[inst.primary_node].add(inst.name)
1536         for secnode in inst.secondary_nodes:
1537           if secnode in node_to_secondary:
1538             node_to_secondary[secnode].add(inst.name)
1539
1540     # end data gathering
1541
1542     output = []
1543     for node in nodelist:
1544       node_output = []
1545       for field in self.op.output_fields:
1546         if field == "name":
1547           val = node.name
1548         elif field == "pinst_list":
1549           val = list(node_to_primary[node.name])
1550         elif field == "sinst_list":
1551           val = list(node_to_secondary[node.name])
1552         elif field == "pinst_cnt":
1553           val = len(node_to_primary[node.name])
1554         elif field == "sinst_cnt":
1555           val = len(node_to_secondary[node.name])
1556         elif field == "pip":
1557           val = node.primary_ip
1558         elif field == "sip":
1559           val = node.secondary_ip
1560         elif field in self.dynamic_fields:
1561           val = live_data[node.name].get(field, None)
1562         else:
1563           raise errors.ParameterError(field)
1564         node_output.append(val)
1565       output.append(node_output)
1566
1567     return output
1568
1569
1570 class LUQueryNodeVolumes(NoHooksLU):
1571   """Logical unit for getting volumes on node(s).
1572
1573   """
1574   _OP_REQP = ["nodes", "output_fields"]
1575
1576   def CheckPrereq(self):
1577     """Check prerequisites.
1578
1579     This checks that the fields required are valid output fields.
1580
1581     """
1582     self.nodes = _GetWantedNodes(self, self.op.nodes)
1583
1584     _CheckOutputFields(static=["node"],
1585                        dynamic=["phys", "vg", "name", "size", "instance"],
1586                        selected=self.op.output_fields)
1587
1588
1589   def Exec(self, feedback_fn):
1590     """Computes the list of nodes and their attributes.
1591
1592     """
1593     nodenames = self.nodes
1594     volumes = rpc.call_node_volumes(nodenames)
1595
1596     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1597              in self.cfg.GetInstanceList()]
1598
1599     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1600
1601     output = []
1602     for node in nodenames:
1603       if node not in volumes or not volumes[node]:
1604         continue
1605
1606       node_vols = volumes[node][:]
1607       node_vols.sort(key=lambda vol: vol['dev'])
1608
1609       for vol in node_vols:
1610         node_output = []
1611         for field in self.op.output_fields:
1612           if field == "node":
1613             val = node
1614           elif field == "phys":
1615             val = vol['dev']
1616           elif field == "vg":
1617             val = vol['vg']
1618           elif field == "name":
1619             val = vol['name']
1620           elif field == "size":
1621             val = int(float(vol['size']))
1622           elif field == "instance":
1623             for inst in ilist:
1624               if node not in lv_by_node[inst]:
1625                 continue
1626               if vol['name'] in lv_by_node[inst][node]:
1627                 val = inst.name
1628                 break
1629             else:
1630               val = '-'
1631           else:
1632             raise errors.ParameterError(field)
1633           node_output.append(str(val))
1634
1635         output.append(node_output)
1636
1637     return output
1638
1639
1640 class LUAddNode(LogicalUnit):
1641   """Logical unit for adding node to the cluster.
1642
1643   """
1644   HPATH = "node-add"
1645   HTYPE = constants.HTYPE_NODE
1646   _OP_REQP = ["node_name"]
1647
1648   def BuildHooksEnv(self):
1649     """Build hooks env.
1650
1651     This will run on all nodes before, and on all nodes + the new node after.
1652
1653     """
1654     env = {
1655       "OP_TARGET": self.op.node_name,
1656       "NODE_NAME": self.op.node_name,
1657       "NODE_PIP": self.op.primary_ip,
1658       "NODE_SIP": self.op.secondary_ip,
1659       }
1660     nodes_0 = self.cfg.GetNodeList()
1661     nodes_1 = nodes_0 + [self.op.node_name, ]
1662     return env, nodes_0, nodes_1
1663
1664   def CheckPrereq(self):
1665     """Check prerequisites.
1666
1667     This checks:
1668      - the new node is not already in the config
1669      - it is resolvable
1670      - its parameters (single/dual homed) matches the cluster
1671
1672     Any errors are signalled by raising errors.OpPrereqError.
1673
1674     """
1675     node_name = self.op.node_name
1676     cfg = self.cfg
1677
1678     dns_data = utils.HostInfo(node_name)
1679
1680     node = dns_data.name
1681     primary_ip = self.op.primary_ip = dns_data.ip
1682     secondary_ip = getattr(self.op, "secondary_ip", None)
1683     if secondary_ip is None:
1684       secondary_ip = primary_ip
1685     if not utils.IsValidIP(secondary_ip):
1686       raise errors.OpPrereqError("Invalid secondary IP given")
1687     self.op.secondary_ip = secondary_ip
1688
1689     node_list = cfg.GetNodeList()
1690     if not self.op.readd and node in node_list:
1691       raise errors.OpPrereqError("Node %s is already in the configuration" %
1692                                  node)
1693     elif self.op.readd and node not in node_list:
1694       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1695
1696     for existing_node_name in node_list:
1697       existing_node = cfg.GetNodeInfo(existing_node_name)
1698
1699       if self.op.readd and node == existing_node_name:
1700         if (existing_node.primary_ip != primary_ip or
1701             existing_node.secondary_ip != secondary_ip):
1702           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1703                                      " address configuration as before")
1704         continue
1705
1706       if (existing_node.primary_ip == primary_ip or
1707           existing_node.secondary_ip == primary_ip or
1708           existing_node.primary_ip == secondary_ip or
1709           existing_node.secondary_ip == secondary_ip):
1710         raise errors.OpPrereqError("New node ip address(es) conflict with"
1711                                    " existing node %s" % existing_node.name)
1712
1713     # check that the type of the node (single versus dual homed) is the
1714     # same as for the master
1715     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1716     master_singlehomed = myself.secondary_ip == myself.primary_ip
1717     newbie_singlehomed = secondary_ip == primary_ip
1718     if master_singlehomed != newbie_singlehomed:
1719       if master_singlehomed:
1720         raise errors.OpPrereqError("The master has no private ip but the"
1721                                    " new node has one")
1722       else:
1723         raise errors.OpPrereqError("The master has a private ip but the"
1724                                    " new node doesn't have one")
1725
1726     # checks reachablity
1727     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1728       raise errors.OpPrereqError("Node not reachable by ping")
1729
1730     if not newbie_singlehomed:
1731       # check reachability from my secondary ip to newbie's secondary ip
1732       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1733                            source=myself.secondary_ip):
1734         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1735                                    " based ping to noded port")
1736
1737     self.new_node = objects.Node(name=node,
1738                                  primary_ip=primary_ip,
1739                                  secondary_ip=secondary_ip)
1740
1741     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1743         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1744                                    constants.VNC_PASSWORD_FILE)
1745
1746   def Exec(self, feedback_fn):
1747     """Adds the new node to the cluster.
1748
1749     """
1750     new_node = self.new_node
1751     node = new_node.name
1752
1753     # set up inter-node password and certificate and restarts the node daemon
1754     gntpass = self.sstore.GetNodeDaemonPassword()
1755     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1756       raise errors.OpExecError("ganeti password corruption detected")
1757     f = open(constants.SSL_CERT_FILE)
1758     try:
1759       gntpem = f.read(8192)
1760     finally:
1761       f.close()
1762     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1763     # so we use this to detect an invalid certificate; as long as the
1764     # cert doesn't contain this, the here-document will be correctly
1765     # parsed by the shell sequence below
1766     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1767       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1768     if not gntpem.endswith("\n"):
1769       raise errors.OpExecError("PEM must end with newline")
1770     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1771
1772     # and then connect with ssh to set password and start ganeti-noded
1773     # note that all the below variables are sanitized at this point,
1774     # either by being constants or by the checks above
1775     ss = self.sstore
1776     mycommand = ("umask 077 && "
1777                  "echo '%s' > '%s' && "
1778                  "cat > '%s' << '!EOF.' && \n"
1779                  "%s!EOF.\n%s restart" %
1780                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1781                   constants.SSL_CERT_FILE, gntpem,
1782                   constants.NODE_INITD_SCRIPT))
1783
1784     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1785     if result.failed:
1786       raise errors.OpExecError("Remote command on node %s, error: %s,"
1787                                " output: %s" %
1788                                (node, result.fail_reason, result.output))
1789
1790     # check connectivity
1791     time.sleep(4)
1792
1793     result = rpc.call_version([node])[node]
1794     if result:
1795       if constants.PROTOCOL_VERSION == result:
1796         logger.Info("communication to node %s fine, sw version %s match" %
1797                     (node, result))
1798       else:
1799         raise errors.OpExecError("Version mismatch master version %s,"
1800                                  " node version %s" %
1801                                  (constants.PROTOCOL_VERSION, result))
1802     else:
1803       raise errors.OpExecError("Cannot get version from the new node")
1804
1805     # setup ssh on node
1806     logger.Info("copy ssh key to node %s" % node)
1807     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1808     keyarray = []
1809     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1810                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1811                 priv_key, pub_key]
1812
1813     for i in keyfiles:
1814       f = open(i, 'r')
1815       try:
1816         keyarray.append(f.read())
1817       finally:
1818         f.close()
1819
1820     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1821                                keyarray[3], keyarray[4], keyarray[5])
1822
1823     if not result:
1824       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1825
1826     # Add node to our /etc/hosts, and add key to known_hosts
1827     _AddHostToEtcHosts(new_node.name)
1828
1829     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1830                       self.cfg.GetHostKey())
1831
1832     if new_node.secondary_ip != new_node.primary_ip:
1833       if not rpc.call_node_tcp_ping(new_node.name,
1834                                     constants.LOCALHOST_IP_ADDRESS,
1835                                     new_node.secondary_ip,
1836                                     constants.DEFAULT_NODED_PORT,
1837                                     10, False):
1838         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1839                                  " you gave (%s). Please fix and re-run this"
1840                                  " command." % new_node.secondary_ip)
1841
1842     success, msg = ssh.VerifyNodeHostname(node)
1843     if not success:
1844       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1845                                " than the one the resolver gives: %s."
1846                                " Please fix and re-run this command." %
1847                                (node, msg))
1848
1849     # Distribute updated /etc/hosts and known_hosts to all nodes,
1850     # including the node just added
1851     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1852     dist_nodes = self.cfg.GetNodeList()
1853     if not self.op.readd:
1854       dist_nodes.append(node)
1855     if myself.name in dist_nodes:
1856       dist_nodes.remove(myself.name)
1857
1858     logger.Debug("Copying hosts and known_hosts to all nodes")
1859     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1860       result = rpc.call_upload_file(dist_nodes, fname)
1861       for to_node in dist_nodes:
1862         if not result[to_node]:
1863           logger.Error("copy of file %s to node %s failed" %
1864                        (fname, to_node))
1865
1866     to_copy = ss.GetFileList()
1867     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1868       to_copy.append(constants.VNC_PASSWORD_FILE)
1869     for fname in to_copy:
1870       if not ssh.CopyFileToNode(node, fname):
1871         logger.Error("could not copy file %s to node %s" % (fname, node))
1872
1873     if not self.op.readd:
1874       logger.Info("adding node %s to cluster.conf" % node)
1875       self.cfg.AddNode(new_node)
1876
1877
1878 class LUMasterFailover(LogicalUnit):
1879   """Failover the master node to the current node.
1880
1881   This is a special LU in that it must run on a non-master node.
1882
1883   """
1884   HPATH = "master-failover"
1885   HTYPE = constants.HTYPE_CLUSTER
1886   REQ_MASTER = False
1887   _OP_REQP = []
1888
1889   def BuildHooksEnv(self):
1890     """Build hooks env.
1891
1892     This will run on the new master only in the pre phase, and on all
1893     the nodes in the post phase.
1894
1895     """
1896     env = {
1897       "OP_TARGET": self.new_master,
1898       "NEW_MASTER": self.new_master,
1899       "OLD_MASTER": self.old_master,
1900       }
1901     return env, [self.new_master], self.cfg.GetNodeList()
1902
1903   def CheckPrereq(self):
1904     """Check prerequisites.
1905
1906     This checks that we are not already the master.
1907
1908     """
1909     self.new_master = utils.HostInfo().name
1910     self.old_master = self.sstore.GetMasterNode()
1911
1912     if self.old_master == self.new_master:
1913       raise errors.OpPrereqError("This commands must be run on the node"
1914                                  " where you want the new master to be."
1915                                  " %s is already the master" %
1916                                  self.old_master)
1917
1918   def Exec(self, feedback_fn):
1919     """Failover the master node.
1920
1921     This command, when run on a non-master node, will cause the current
1922     master to cease being master, and the non-master to become new
1923     master.
1924
1925     """
1926     #TODO: do not rely on gethostname returning the FQDN
1927     logger.Info("setting master to %s, old master: %s" %
1928                 (self.new_master, self.old_master))
1929
1930     if not rpc.call_node_stop_master(self.old_master):
1931       logger.Error("could disable the master role on the old master"
1932                    " %s, please disable manually" % self.old_master)
1933
1934     ss = self.sstore
1935     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1936     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1937                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1938       logger.Error("could not distribute the new simple store master file"
1939                    " to the other nodes, please check.")
1940
1941     if not rpc.call_node_start_master(self.new_master):
1942       logger.Error("could not start the master role on the new master"
1943                    " %s, please check" % self.new_master)
1944       feedback_fn("Error in activating the master IP on the new master,"
1945                   " please fix manually.")
1946
1947
1948
1949 class LUQueryClusterInfo(NoHooksLU):
1950   """Query cluster configuration.
1951
1952   """
1953   _OP_REQP = []
1954   REQ_MASTER = False
1955
1956   def CheckPrereq(self):
1957     """No prerequsites needed for this LU.
1958
1959     """
1960     pass
1961
1962   def Exec(self, feedback_fn):
1963     """Return cluster config.
1964
1965     """
1966     result = {
1967       "name": self.sstore.GetClusterName(),
1968       "software_version": constants.RELEASE_VERSION,
1969       "protocol_version": constants.PROTOCOL_VERSION,
1970       "config_version": constants.CONFIG_VERSION,
1971       "os_api_version": constants.OS_API_VERSION,
1972       "export_version": constants.EXPORT_VERSION,
1973       "master": self.sstore.GetMasterNode(),
1974       "architecture": (platform.architecture()[0], platform.machine()),
1975       "hypervisor_type": self.sstore.GetHypervisorType(),
1976       }
1977
1978     return result
1979
1980
1981 class LUClusterCopyFile(NoHooksLU):
1982   """Copy file to cluster.
1983
1984   """
1985   _OP_REQP = ["nodes", "filename"]
1986
1987   def CheckPrereq(self):
1988     """Check prerequisites.
1989
1990     It should check that the named file exists and that the given list
1991     of nodes is valid.
1992
1993     """
1994     if not os.path.exists(self.op.filename):
1995       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1996
1997     self.nodes = _GetWantedNodes(self, self.op.nodes)
1998
1999   def Exec(self, feedback_fn):
2000     """Copy a file from master to some nodes.
2001
2002     Args:
2003       opts - class with options as members
2004       args - list containing a single element, the file name
2005     Opts used:
2006       nodes - list containing the name of target nodes; if empty, all nodes
2007
2008     """
2009     filename = self.op.filename
2010
2011     myname = utils.HostInfo().name
2012
2013     for node in self.nodes:
2014       if node == myname:
2015         continue
2016       if not ssh.CopyFileToNode(node, filename):
2017         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2018
2019
2020 class LUDumpClusterConfig(NoHooksLU):
2021   """Return a text-representation of the cluster-config.
2022
2023   """
2024   _OP_REQP = []
2025
2026   def CheckPrereq(self):
2027     """No prerequisites.
2028
2029     """
2030     pass
2031
2032   def Exec(self, feedback_fn):
2033     """Dump a representation of the cluster config to the standard output.
2034
2035     """
2036     return self.cfg.DumpConfig()
2037
2038
2039 class LURunClusterCommand(NoHooksLU):
2040   """Run a command on some nodes.
2041
2042   """
2043   _OP_REQP = ["command", "nodes"]
2044
2045   def CheckPrereq(self):
2046     """Check prerequisites.
2047
2048     It checks that the given list of nodes is valid.
2049
2050     """
2051     self.nodes = _GetWantedNodes(self, self.op.nodes)
2052
2053   def Exec(self, feedback_fn):
2054     """Run a command on some nodes.
2055
2056     """
2057     # put the master at the end of the nodes list
2058     master_node = self.sstore.GetMasterNode()
2059     if master_node in self.nodes:
2060       self.nodes.remove(master_node)
2061       self.nodes.append(master_node)
2062
2063     data = []
2064     for node in self.nodes:
2065       result = ssh.SSHCall(node, "root", self.op.command)
2066       data.append((node, result.output, result.exit_code))
2067
2068     return data
2069
2070
2071 class LUActivateInstanceDisks(NoHooksLU):
2072   """Bring up an instance's disks.
2073
2074   """
2075   _OP_REQP = ["instance_name"]
2076
2077   def CheckPrereq(self):
2078     """Check prerequisites.
2079
2080     This checks that the instance is in the cluster.
2081
2082     """
2083     instance = self.cfg.GetInstanceInfo(
2084       self.cfg.ExpandInstanceName(self.op.instance_name))
2085     if instance is None:
2086       raise errors.OpPrereqError("Instance '%s' not known" %
2087                                  self.op.instance_name)
2088     self.instance = instance
2089
2090
2091   def Exec(self, feedback_fn):
2092     """Activate the disks.
2093
2094     """
2095     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2096     if not disks_ok:
2097       raise errors.OpExecError("Cannot activate block devices")
2098
2099     return disks_info
2100
2101
2102 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2103   """Prepare the block devices for an instance.
2104
2105   This sets up the block devices on all nodes.
2106
2107   Args:
2108     instance: a ganeti.objects.Instance object
2109     ignore_secondaries: if true, errors on secondary nodes won't result
2110                         in an error return from the function
2111
2112   Returns:
2113     false if the operation failed
2114     list of (host, instance_visible_name, node_visible_name) if the operation
2115          suceeded with the mapping from node devices to instance devices
2116   """
2117   device_info = []
2118   disks_ok = True
2119   iname = instance.name
2120   # With the two passes mechanism we try to reduce the window of
2121   # opportunity for the race condition of switching DRBD to primary
2122   # before handshaking occured, but we do not eliminate it
2123
2124   # The proper fix would be to wait (with some limits) until the
2125   # connection has been made and drbd transitions from WFConnection
2126   # into any other network-connected state (Connected, SyncTarget,
2127   # SyncSource, etc.)
2128
2129   # 1st pass, assemble on all nodes in secondary mode
2130   for inst_disk in instance.disks:
2131     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2132       cfg.SetDiskID(node_disk, node)
2133       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2134       if not result:
2135         logger.Error("could not prepare block device %s on node %s"
2136                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2137         if not ignore_secondaries:
2138           disks_ok = False
2139
2140   # FIXME: race condition on drbd migration to primary
2141
2142   # 2nd pass, do only the primary node
2143   for inst_disk in instance.disks:
2144     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2145       if node != instance.primary_node:
2146         continue
2147       cfg.SetDiskID(node_disk, node)
2148       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2149       if not result:
2150         logger.Error("could not prepare block device %s on node %s"
2151                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2152         disks_ok = False
2153     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2154
2155   # leave the disks configured for the primary node
2156   # this is a workaround that would be fixed better by
2157   # improving the logical/physical id handling
2158   for disk in instance.disks:
2159     cfg.SetDiskID(disk, instance.primary_node)
2160
2161   return disks_ok, device_info
2162
2163
2164 def _StartInstanceDisks(cfg, instance, force):
2165   """Start the disks of an instance.
2166
2167   """
2168   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2169                                            ignore_secondaries=force)
2170   if not disks_ok:
2171     _ShutdownInstanceDisks(instance, cfg)
2172     if force is not None and not force:
2173       logger.Error("If the message above refers to a secondary node,"
2174                    " you can retry the operation using '--force'.")
2175     raise errors.OpExecError("Disk consistency error")
2176
2177
2178 class LUDeactivateInstanceDisks(NoHooksLU):
2179   """Shutdown an instance's disks.
2180
2181   """
2182   _OP_REQP = ["instance_name"]
2183
2184   def CheckPrereq(self):
2185     """Check prerequisites.
2186
2187     This checks that the instance is in the cluster.
2188
2189     """
2190     instance = self.cfg.GetInstanceInfo(
2191       self.cfg.ExpandInstanceName(self.op.instance_name))
2192     if instance is None:
2193       raise errors.OpPrereqError("Instance '%s' not known" %
2194                                  self.op.instance_name)
2195     self.instance = instance
2196
2197   def Exec(self, feedback_fn):
2198     """Deactivate the disks
2199
2200     """
2201     instance = self.instance
2202     ins_l = rpc.call_instance_list([instance.primary_node])
2203     ins_l = ins_l[instance.primary_node]
2204     if not type(ins_l) is list:
2205       raise errors.OpExecError("Can't contact node '%s'" %
2206                                instance.primary_node)
2207
2208     if self.instance.name in ins_l:
2209       raise errors.OpExecError("Instance is running, can't shutdown"
2210                                " block devices.")
2211
2212     _ShutdownInstanceDisks(instance, self.cfg)
2213
2214
2215 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2216   """Shutdown block devices of an instance.
2217
2218   This does the shutdown on all nodes of the instance.
2219
2220   If the ignore_primary is false, errors on the primary node are
2221   ignored.
2222
2223   """
2224   result = True
2225   for disk in instance.disks:
2226     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2227       cfg.SetDiskID(top_disk, node)
2228       if not rpc.call_blockdev_shutdown(node, top_disk):
2229         logger.Error("could not shutdown block device %s on node %s" %
2230                      (disk.iv_name, node))
2231         if not ignore_primary or node != instance.primary_node:
2232           result = False
2233   return result
2234
2235
2236 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2237   """Checks if a node has enough free memory.
2238
2239   This function check if a given node has the needed amount of free
2240   memory. In case the node has less memory or we cannot get the
2241   information from the node, this function raise an OpPrereqError
2242   exception.
2243
2244   Args:
2245     - cfg: a ConfigWriter instance
2246     - node: the node name
2247     - reason: string to use in the error message
2248     - requested: the amount of memory in MiB
2249
2250   """
2251   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2252   if not nodeinfo or not isinstance(nodeinfo, dict):
2253     raise errors.OpPrereqError("Could not contact node %s for resource"
2254                              " information" % (node,))
2255
2256   free_mem = nodeinfo[node].get('memory_free')
2257   if not isinstance(free_mem, int):
2258     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2259                              " was '%s'" % (node, free_mem))
2260   if requested > free_mem:
2261     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2262                              " needed %s MiB, available %s MiB" %
2263                              (node, reason, requested, free_mem))
2264
2265
2266 class LUStartupInstance(LogicalUnit):
2267   """Starts an instance.
2268
2269   """
2270   HPATH = "instance-start"
2271   HTYPE = constants.HTYPE_INSTANCE
2272   _OP_REQP = ["instance_name", "force"]
2273
2274   def BuildHooksEnv(self):
2275     """Build hooks env.
2276
2277     This runs on master, primary and secondary nodes of the instance.
2278
2279     """
2280     env = {
2281       "FORCE": self.op.force,
2282       }
2283     env.update(_BuildInstanceHookEnvByObject(self.instance))
2284     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2285           list(self.instance.secondary_nodes))
2286     return env, nl, nl
2287
2288   def CheckPrereq(self):
2289     """Check prerequisites.
2290
2291     This checks that the instance is in the cluster.
2292
2293     """
2294     instance = self.cfg.GetInstanceInfo(
2295       self.cfg.ExpandInstanceName(self.op.instance_name))
2296     if instance is None:
2297       raise errors.OpPrereqError("Instance '%s' not known" %
2298                                  self.op.instance_name)
2299
2300     # check bridges existance
2301     _CheckInstanceBridgesExist(instance)
2302
2303     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2304                          "starting instance %s" % instance.name,
2305                          instance.memory)
2306
2307     self.instance = instance
2308     self.op.instance_name = instance.name
2309
2310   def Exec(self, feedback_fn):
2311     """Start the instance.
2312
2313     """
2314     instance = self.instance
2315     force = self.op.force
2316     extra_args = getattr(self.op, "extra_args", "")
2317
2318     self.cfg.MarkInstanceUp(instance.name)
2319
2320     node_current = instance.primary_node
2321
2322     _StartInstanceDisks(self.cfg, instance, force)
2323
2324     if not rpc.call_instance_start(node_current, instance, extra_args):
2325       _ShutdownInstanceDisks(instance, self.cfg)
2326       raise errors.OpExecError("Could not start instance")
2327
2328
2329 class LURebootInstance(LogicalUnit):
2330   """Reboot an instance.
2331
2332   """
2333   HPATH = "instance-reboot"
2334   HTYPE = constants.HTYPE_INSTANCE
2335   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2336
2337   def BuildHooksEnv(self):
2338     """Build hooks env.
2339
2340     This runs on master, primary and secondary nodes of the instance.
2341
2342     """
2343     env = {
2344       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2345       }
2346     env.update(_BuildInstanceHookEnvByObject(self.instance))
2347     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2348           list(self.instance.secondary_nodes))
2349     return env, nl, nl
2350
2351   def CheckPrereq(self):
2352     """Check prerequisites.
2353
2354     This checks that the instance is in the cluster.
2355
2356     """
2357     instance = self.cfg.GetInstanceInfo(
2358       self.cfg.ExpandInstanceName(self.op.instance_name))
2359     if instance is None:
2360       raise errors.OpPrereqError("Instance '%s' not known" %
2361                                  self.op.instance_name)
2362
2363     # check bridges existance
2364     _CheckInstanceBridgesExist(instance)
2365
2366     self.instance = instance
2367     self.op.instance_name = instance.name
2368
2369   def Exec(self, feedback_fn):
2370     """Reboot the instance.
2371
2372     """
2373     instance = self.instance
2374     ignore_secondaries = self.op.ignore_secondaries
2375     reboot_type = self.op.reboot_type
2376     extra_args = getattr(self.op, "extra_args", "")
2377
2378     node_current = instance.primary_node
2379
2380     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2381                            constants.INSTANCE_REBOOT_HARD,
2382                            constants.INSTANCE_REBOOT_FULL]:
2383       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2384                                   (constants.INSTANCE_REBOOT_SOFT,
2385                                    constants.INSTANCE_REBOOT_HARD,
2386                                    constants.INSTANCE_REBOOT_FULL))
2387
2388     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2389                        constants.INSTANCE_REBOOT_HARD]:
2390       if not rpc.call_instance_reboot(node_current, instance,
2391                                       reboot_type, extra_args):
2392         raise errors.OpExecError("Could not reboot instance")
2393     else:
2394       if not rpc.call_instance_shutdown(node_current, instance):
2395         raise errors.OpExecError("could not shutdown instance for full reboot")
2396       _ShutdownInstanceDisks(instance, self.cfg)
2397       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2398       if not rpc.call_instance_start(node_current, instance, extra_args):
2399         _ShutdownInstanceDisks(instance, self.cfg)
2400         raise errors.OpExecError("Could not start instance for full reboot")
2401
2402     self.cfg.MarkInstanceUp(instance.name)
2403
2404
2405 class LUShutdownInstance(LogicalUnit):
2406   """Shutdown an instance.
2407
2408   """
2409   HPATH = "instance-stop"
2410   HTYPE = constants.HTYPE_INSTANCE
2411   _OP_REQP = ["instance_name"]
2412
2413   def BuildHooksEnv(self):
2414     """Build hooks env.
2415
2416     This runs on master, primary and secondary nodes of the instance.
2417
2418     """
2419     env = _BuildInstanceHookEnvByObject(self.instance)
2420     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2421           list(self.instance.secondary_nodes))
2422     return env, nl, nl
2423
2424   def CheckPrereq(self):
2425     """Check prerequisites.
2426
2427     This checks that the instance is in the cluster.
2428
2429     """
2430     instance = self.cfg.GetInstanceInfo(
2431       self.cfg.ExpandInstanceName(self.op.instance_name))
2432     if instance is None:
2433       raise errors.OpPrereqError("Instance '%s' not known" %
2434                                  self.op.instance_name)
2435     self.instance = instance
2436
2437   def Exec(self, feedback_fn):
2438     """Shutdown the instance.
2439
2440     """
2441     instance = self.instance
2442     node_current = instance.primary_node
2443     self.cfg.MarkInstanceDown(instance.name)
2444     if not rpc.call_instance_shutdown(node_current, instance):
2445       logger.Error("could not shutdown instance")
2446
2447     _ShutdownInstanceDisks(instance, self.cfg)
2448
2449
2450 class LUReinstallInstance(LogicalUnit):
2451   """Reinstall an instance.
2452
2453   """
2454   HPATH = "instance-reinstall"
2455   HTYPE = constants.HTYPE_INSTANCE
2456   _OP_REQP = ["instance_name"]
2457
2458   def BuildHooksEnv(self):
2459     """Build hooks env.
2460
2461     This runs on master, primary and secondary nodes of the instance.
2462
2463     """
2464     env = _BuildInstanceHookEnvByObject(self.instance)
2465     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2466           list(self.instance.secondary_nodes))
2467     return env, nl, nl
2468
2469   def CheckPrereq(self):
2470     """Check prerequisites.
2471
2472     This checks that the instance is in the cluster and is not running.
2473
2474     """
2475     instance = self.cfg.GetInstanceInfo(
2476       self.cfg.ExpandInstanceName(self.op.instance_name))
2477     if instance is None:
2478       raise errors.OpPrereqError("Instance '%s' not known" %
2479                                  self.op.instance_name)
2480     if instance.disk_template == constants.DT_DISKLESS:
2481       raise errors.OpPrereqError("Instance '%s' has no disks" %
2482                                  self.op.instance_name)
2483     if instance.status != "down":
2484       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2485                                  self.op.instance_name)
2486     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2487     if remote_info:
2488       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2489                                  (self.op.instance_name,
2490                                   instance.primary_node))
2491
2492     self.op.os_type = getattr(self.op, "os_type", None)
2493     if self.op.os_type is not None:
2494       # OS verification
2495       pnode = self.cfg.GetNodeInfo(
2496         self.cfg.ExpandNodeName(instance.primary_node))
2497       if pnode is None:
2498         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2499                                    self.op.pnode)
2500       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2501       if not os_obj:
2502         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2503                                    " primary node"  % self.op.os_type)
2504
2505     self.instance = instance
2506
2507   def Exec(self, feedback_fn):
2508     """Reinstall the instance.
2509
2510     """
2511     inst = self.instance
2512
2513     if self.op.os_type is not None:
2514       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2515       inst.os = self.op.os_type
2516       self.cfg.AddInstance(inst)
2517
2518     _StartInstanceDisks(self.cfg, inst, None)
2519     try:
2520       feedback_fn("Running the instance OS create scripts...")
2521       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2522         raise errors.OpExecError("Could not install OS for instance %s"
2523                                  " on node %s" %
2524                                  (inst.name, inst.primary_node))
2525     finally:
2526       _ShutdownInstanceDisks(inst, self.cfg)
2527
2528
2529 class LURenameInstance(LogicalUnit):
2530   """Rename an instance.
2531
2532   """
2533   HPATH = "instance-rename"
2534   HTYPE = constants.HTYPE_INSTANCE
2535   _OP_REQP = ["instance_name", "new_name"]
2536
2537   def BuildHooksEnv(self):
2538     """Build hooks env.
2539
2540     This runs on master, primary and secondary nodes of the instance.
2541
2542     """
2543     env = _BuildInstanceHookEnvByObject(self.instance)
2544     env["INSTANCE_NEW_NAME"] = self.op.new_name
2545     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2546           list(self.instance.secondary_nodes))
2547     return env, nl, nl
2548
2549   def CheckPrereq(self):
2550     """Check prerequisites.
2551
2552     This checks that the instance is in the cluster and is not running.
2553
2554     """
2555     instance = self.cfg.GetInstanceInfo(
2556       self.cfg.ExpandInstanceName(self.op.instance_name))
2557     if instance is None:
2558       raise errors.OpPrereqError("Instance '%s' not known" %
2559                                  self.op.instance_name)
2560     if instance.status != "down":
2561       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2562                                  self.op.instance_name)
2563     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2564     if remote_info:
2565       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2566                                  (self.op.instance_name,
2567                                   instance.primary_node))
2568     self.instance = instance
2569
2570     # new name verification
2571     name_info = utils.HostInfo(self.op.new_name)
2572
2573     self.op.new_name = new_name = name_info.name
2574     instance_list = self.cfg.GetInstanceList()
2575     if new_name in instance_list:
2576       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2577                                  new_name)
2578
2579     if not getattr(self.op, "ignore_ip", False):
2580       command = ["fping", "-q", name_info.ip]
2581       result = utils.RunCmd(command)
2582       if not result.failed:
2583         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2584                                    (name_info.ip, new_name))
2585
2586
2587   def Exec(self, feedback_fn):
2588     """Reinstall the instance.
2589
2590     """
2591     inst = self.instance
2592     old_name = inst.name
2593
2594     self.cfg.RenameInstance(inst.name, self.op.new_name)
2595
2596     # re-read the instance from the configuration after rename
2597     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2598
2599     _StartInstanceDisks(self.cfg, inst, None)
2600     try:
2601       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2602                                           "sda", "sdb"):
2603         msg = ("Could run OS rename script for instance %s on node %s (but the"
2604                " instance has been renamed in Ganeti)" %
2605                (inst.name, inst.primary_node))
2606         logger.Error(msg)
2607     finally:
2608       _ShutdownInstanceDisks(inst, self.cfg)
2609
2610
2611 class LURemoveInstance(LogicalUnit):
2612   """Remove an instance.
2613
2614   """
2615   HPATH = "instance-remove"
2616   HTYPE = constants.HTYPE_INSTANCE
2617   _OP_REQP = ["instance_name"]
2618
2619   def BuildHooksEnv(self):
2620     """Build hooks env.
2621
2622     This runs on master, primary and secondary nodes of the instance.
2623
2624     """
2625     env = _BuildInstanceHookEnvByObject(self.instance)
2626     nl = [self.sstore.GetMasterNode()]
2627     return env, nl, nl
2628
2629   def CheckPrereq(self):
2630     """Check prerequisites.
2631
2632     This checks that the instance is in the cluster.
2633
2634     """
2635     instance = self.cfg.GetInstanceInfo(
2636       self.cfg.ExpandInstanceName(self.op.instance_name))
2637     if instance is None:
2638       raise errors.OpPrereqError("Instance '%s' not known" %
2639                                  self.op.instance_name)
2640     self.instance = instance
2641
2642   def Exec(self, feedback_fn):
2643     """Remove the instance.
2644
2645     """
2646     instance = self.instance
2647     logger.Info("shutting down instance %s on node %s" %
2648                 (instance.name, instance.primary_node))
2649
2650     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2651       if self.op.ignore_failures:
2652         feedback_fn("Warning: can't shutdown instance")
2653       else:
2654         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2655                                  (instance.name, instance.primary_node))
2656
2657     logger.Info("removing block devices for instance %s" % instance.name)
2658
2659     if not _RemoveDisks(instance, self.cfg):
2660       if self.op.ignore_failures:
2661         feedback_fn("Warning: can't remove instance's disks")
2662       else:
2663         raise errors.OpExecError("Can't remove instance's disks")
2664
2665     logger.Info("removing instance %s out of cluster config" % instance.name)
2666
2667     self.cfg.RemoveInstance(instance.name)
2668
2669
2670 class LUQueryInstances(NoHooksLU):
2671   """Logical unit for querying instances.
2672
2673   """
2674   _OP_REQP = ["output_fields", "names"]
2675
2676   def CheckPrereq(self):
2677     """Check prerequisites.
2678
2679     This checks that the fields required are valid output fields.
2680
2681     """
2682     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2683     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2684                                "admin_state", "admin_ram",
2685                                "disk_template", "ip", "mac", "bridge",
2686                                "sda_size", "sdb_size", "vcpus"],
2687                        dynamic=self.dynamic_fields,
2688                        selected=self.op.output_fields)
2689
2690     self.wanted = _GetWantedInstances(self, self.op.names)
2691
2692   def Exec(self, feedback_fn):
2693     """Computes the list of nodes and their attributes.
2694
2695     """
2696     instance_names = self.wanted
2697     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2698                      in instance_names]
2699
2700     # begin data gathering
2701
2702     nodes = frozenset([inst.primary_node for inst in instance_list])
2703
2704     bad_nodes = []
2705     if self.dynamic_fields.intersection(self.op.output_fields):
2706       live_data = {}
2707       node_data = rpc.call_all_instances_info(nodes)
2708       for name in nodes:
2709         result = node_data[name]
2710         if result:
2711           live_data.update(result)
2712         elif result == False:
2713           bad_nodes.append(name)
2714         # else no instance is alive
2715     else:
2716       live_data = dict([(name, {}) for name in instance_names])
2717
2718     # end data gathering
2719
2720     output = []
2721     for instance in instance_list:
2722       iout = []
2723       for field in self.op.output_fields:
2724         if field == "name":
2725           val = instance.name
2726         elif field == "os":
2727           val = instance.os
2728         elif field == "pnode":
2729           val = instance.primary_node
2730         elif field == "snodes":
2731           val = list(instance.secondary_nodes)
2732         elif field == "admin_state":
2733           val = (instance.status != "down")
2734         elif field == "oper_state":
2735           if instance.primary_node in bad_nodes:
2736             val = None
2737           else:
2738             val = bool(live_data.get(instance.name))
2739         elif field == "status":
2740           if instance.primary_node in bad_nodes:
2741             val = "ERROR_nodedown"
2742           else:
2743             running = bool(live_data.get(instance.name))
2744             if running:
2745               if instance.status != "down":
2746                 val = "running"
2747               else:
2748                 val = "ERROR_up"
2749             else:
2750               if instance.status != "down":
2751                 val = "ERROR_down"
2752               else:
2753                 val = "ADMIN_down"
2754         elif field == "admin_ram":
2755           val = instance.memory
2756         elif field == "oper_ram":
2757           if instance.primary_node in bad_nodes:
2758             val = None
2759           elif instance.name in live_data:
2760             val = live_data[instance.name].get("memory", "?")
2761           else:
2762             val = "-"
2763         elif field == "disk_template":
2764           val = instance.disk_template
2765         elif field == "ip":
2766           val = instance.nics[0].ip
2767         elif field == "bridge":
2768           val = instance.nics[0].bridge
2769         elif field == "mac":
2770           val = instance.nics[0].mac
2771         elif field == "sda_size" or field == "sdb_size":
2772           disk = instance.FindDisk(field[:3])
2773           if disk is None:
2774             val = None
2775           else:
2776             val = disk.size
2777         elif field == "vcpus":
2778           val = instance.vcpus
2779         else:
2780           raise errors.ParameterError(field)
2781         iout.append(val)
2782       output.append(iout)
2783
2784     return output
2785
2786
2787 class LUFailoverInstance(LogicalUnit):
2788   """Failover an instance.
2789
2790   """
2791   HPATH = "instance-failover"
2792   HTYPE = constants.HTYPE_INSTANCE
2793   _OP_REQP = ["instance_name", "ignore_consistency"]
2794
2795   def BuildHooksEnv(self):
2796     """Build hooks env.
2797
2798     This runs on master, primary and secondary nodes of the instance.
2799
2800     """
2801     env = {
2802       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2803       }
2804     env.update(_BuildInstanceHookEnvByObject(self.instance))
2805     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2806     return env, nl, nl
2807
2808   def CheckPrereq(self):
2809     """Check prerequisites.
2810
2811     This checks that the instance is in the cluster.
2812
2813     """
2814     instance = self.cfg.GetInstanceInfo(
2815       self.cfg.ExpandInstanceName(self.op.instance_name))
2816     if instance is None:
2817       raise errors.OpPrereqError("Instance '%s' not known" %
2818                                  self.op.instance_name)
2819
2820     if instance.disk_template not in constants.DTS_NET_MIRROR:
2821       raise errors.OpPrereqError("Instance's disk layout is not"
2822                                  " network mirrored, cannot failover.")
2823
2824     secondary_nodes = instance.secondary_nodes
2825     if not secondary_nodes:
2826       raise errors.ProgrammerError("no secondary node but using "
2827                                    "DT_REMOTE_RAID1 template")
2828
2829     target_node = secondary_nodes[0]
2830     # check memory requirements on the secondary node
2831     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2832                          instance.name, instance.memory)
2833
2834     # check bridge existance
2835     brlist = [nic.bridge for nic in instance.nics]
2836     if not rpc.call_bridges_exist(target_node, brlist):
2837       raise errors.OpPrereqError("One or more target bridges %s does not"
2838                                  " exist on destination node '%s'" %
2839                                  (brlist, target_node))
2840
2841     self.instance = instance
2842
2843   def Exec(self, feedback_fn):
2844     """Failover an instance.
2845
2846     The failover is done by shutting it down on its present node and
2847     starting it on the secondary.
2848
2849     """
2850     instance = self.instance
2851
2852     source_node = instance.primary_node
2853     target_node = instance.secondary_nodes[0]
2854
2855     feedback_fn("* checking disk consistency between source and target")
2856     for dev in instance.disks:
2857       # for remote_raid1, these are md over drbd
2858       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2859         if instance.status == "up" and not self.op.ignore_consistency:
2860           raise errors.OpExecError("Disk %s is degraded on target node,"
2861                                    " aborting failover." % dev.iv_name)
2862
2863     feedback_fn("* shutting down instance on source node")
2864     logger.Info("Shutting down instance %s on node %s" %
2865                 (instance.name, source_node))
2866
2867     if not rpc.call_instance_shutdown(source_node, instance):
2868       if self.op.ignore_consistency:
2869         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2870                      " anyway. Please make sure node %s is down"  %
2871                      (instance.name, source_node, source_node))
2872       else:
2873         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2874                                  (instance.name, source_node))
2875
2876     feedback_fn("* deactivating the instance's disks on source node")
2877     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2878       raise errors.OpExecError("Can't shut down the instance's disks.")
2879
2880     instance.primary_node = target_node
2881     # distribute new instance config to the other nodes
2882     self.cfg.AddInstance(instance)
2883
2884     # Only start the instance if it's marked as up
2885     if instance.status == "up":
2886       feedback_fn("* activating the instance's disks on target node")
2887       logger.Info("Starting instance %s on node %s" %
2888                   (instance.name, target_node))
2889
2890       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2891                                                ignore_secondaries=True)
2892       if not disks_ok:
2893         _ShutdownInstanceDisks(instance, self.cfg)
2894         raise errors.OpExecError("Can't activate the instance's disks")
2895
2896       feedback_fn("* starting the instance on the target node")
2897       if not rpc.call_instance_start(target_node, instance, None):
2898         _ShutdownInstanceDisks(instance, self.cfg)
2899         raise errors.OpExecError("Could not start instance %s on node %s." %
2900                                  (instance.name, target_node))
2901
2902
2903 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2904   """Create a tree of block devices on the primary node.
2905
2906   This always creates all devices.
2907
2908   """
2909   if device.children:
2910     for child in device.children:
2911       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2912         return False
2913
2914   cfg.SetDiskID(device, node)
2915   new_id = rpc.call_blockdev_create(node, device, device.size,
2916                                     instance.name, True, info)
2917   if not new_id:
2918     return False
2919   if device.physical_id is None:
2920     device.physical_id = new_id
2921   return True
2922
2923
2924 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2925   """Create a tree of block devices on a secondary node.
2926
2927   If this device type has to be created on secondaries, create it and
2928   all its children.
2929
2930   If not, just recurse to children keeping the same 'force' value.
2931
2932   """
2933   if device.CreateOnSecondary():
2934     force = True
2935   if device.children:
2936     for child in device.children:
2937       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2938                                         child, force, info):
2939         return False
2940
2941   if not force:
2942     return True
2943   cfg.SetDiskID(device, node)
2944   new_id = rpc.call_blockdev_create(node, device, device.size,
2945                                     instance.name, False, info)
2946   if not new_id:
2947     return False
2948   if device.physical_id is None:
2949     device.physical_id = new_id
2950   return True
2951
2952
2953 def _GenerateUniqueNames(cfg, exts):
2954   """Generate a suitable LV name.
2955
2956   This will generate a logical volume name for the given instance.
2957
2958   """
2959   results = []
2960   for val in exts:
2961     new_id = cfg.GenerateUniqueID()
2962     results.append("%s%s" % (new_id, val))
2963   return results
2964
2965
2966 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2967   """Generate a drbd device complete with its children.
2968
2969   """
2970   port = cfg.AllocatePort()
2971   vgname = cfg.GetVGName()
2972   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2973                           logical_id=(vgname, names[0]))
2974   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2975                           logical_id=(vgname, names[1]))
2976   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2977                           logical_id = (primary, secondary, port),
2978                           children = [dev_data, dev_meta])
2979   return drbd_dev
2980
2981
2982 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2983   """Generate a drbd8 device complete with its children.
2984
2985   """
2986   port = cfg.AllocatePort()
2987   vgname = cfg.GetVGName()
2988   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2989                           logical_id=(vgname, names[0]))
2990   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2991                           logical_id=(vgname, names[1]))
2992   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2993                           logical_id = (primary, secondary, port),
2994                           children = [dev_data, dev_meta],
2995                           iv_name=iv_name)
2996   return drbd_dev
2997
2998 def _GenerateDiskTemplate(cfg, template_name,
2999                           instance_name, primary_node,
3000                           secondary_nodes, disk_sz, swap_sz):
3001   """Generate the entire disk layout for a given template type.
3002
3003   """
3004   #TODO: compute space requirements
3005
3006   vgname = cfg.GetVGName()
3007   if template_name == constants.DT_DISKLESS:
3008     disks = []
3009   elif template_name == constants.DT_PLAIN:
3010     if len(secondary_nodes) != 0:
3011       raise errors.ProgrammerError("Wrong template configuration")
3012
3013     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3014     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3015                            logical_id=(vgname, names[0]),
3016                            iv_name = "sda")
3017     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3018                            logical_id=(vgname, names[1]),
3019                            iv_name = "sdb")
3020     disks = [sda_dev, sdb_dev]
3021   elif template_name == constants.DT_LOCAL_RAID1:
3022     if len(secondary_nodes) != 0:
3023       raise errors.ProgrammerError("Wrong template configuration")
3024
3025
3026     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3027                                        ".sdb_m1", ".sdb_m2"])
3028     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3029                               logical_id=(vgname, names[0]))
3030     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3031                               logical_id=(vgname, names[1]))
3032     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3033                               size=disk_sz,
3034                               children = [sda_dev_m1, sda_dev_m2])
3035     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3036                               logical_id=(vgname, names[2]))
3037     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3038                               logical_id=(vgname, names[3]))
3039     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3040                               size=swap_sz,
3041                               children = [sdb_dev_m1, sdb_dev_m2])
3042     disks = [md_sda_dev, md_sdb_dev]
3043   elif template_name == constants.DT_REMOTE_RAID1:
3044     if len(secondary_nodes) != 1:
3045       raise errors.ProgrammerError("Wrong template configuration")
3046     remote_node = secondary_nodes[0]
3047     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3048                                        ".sdb_data", ".sdb_meta"])
3049     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3050                                          disk_sz, names[0:2])
3051     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3052                               children = [drbd_sda_dev], size=disk_sz)
3053     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3054                                          swap_sz, names[2:4])
3055     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3056                               children = [drbd_sdb_dev], size=swap_sz)
3057     disks = [md_sda_dev, md_sdb_dev]
3058   elif template_name == constants.DT_DRBD8:
3059     if len(secondary_nodes) != 1:
3060       raise errors.ProgrammerError("Wrong template configuration")
3061     remote_node = secondary_nodes[0]
3062     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3063                                        ".sdb_data", ".sdb_meta"])
3064     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3065                                          disk_sz, names[0:2], "sda")
3066     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3067                                          swap_sz, names[2:4], "sdb")
3068     disks = [drbd_sda_dev, drbd_sdb_dev]
3069   else:
3070     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3071   return disks
3072
3073
3074 def _GetInstanceInfoText(instance):
3075   """Compute that text that should be added to the disk's metadata.
3076
3077   """
3078   return "originstname+%s" % instance.name
3079
3080
3081 def _CreateDisks(cfg, instance):
3082   """Create all disks for an instance.
3083
3084   This abstracts away some work from AddInstance.
3085
3086   Args:
3087     instance: the instance object
3088
3089   Returns:
3090     True or False showing the success of the creation process
3091
3092   """
3093   info = _GetInstanceInfoText(instance)
3094
3095   for device in instance.disks:
3096     logger.Info("creating volume %s for instance %s" %
3097               (device.iv_name, instance.name))
3098     #HARDCODE
3099     for secondary_node in instance.secondary_nodes:
3100       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3101                                         device, False, info):
3102         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3103                      (device.iv_name, device, secondary_node))
3104         return False
3105     #HARDCODE
3106     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3107                                     instance, device, info):
3108       logger.Error("failed to create volume %s on primary!" %
3109                    device.iv_name)
3110       return False
3111   return True
3112
3113
3114 def _RemoveDisks(instance, cfg):
3115   """Remove all disks for an instance.
3116
3117   This abstracts away some work from `AddInstance()` and
3118   `RemoveInstance()`. Note that in case some of the devices couldn't
3119   be removed, the removal will continue with the other ones (compare
3120   with `_CreateDisks()`).
3121
3122   Args:
3123     instance: the instance object
3124
3125   Returns:
3126     True or False showing the success of the removal proces
3127
3128   """
3129   logger.Info("removing block devices for instance %s" % instance.name)
3130
3131   result = True
3132   for device in instance.disks:
3133     for node, disk in device.ComputeNodeTree(instance.primary_node):
3134       cfg.SetDiskID(disk, node)
3135       if not rpc.call_blockdev_remove(node, disk):
3136         logger.Error("could not remove block device %s on node %s,"
3137                      " continuing anyway" %
3138                      (device.iv_name, node))
3139         result = False
3140   return result
3141
3142
3143 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3144   """Compute disk size requirements in the volume group
3145
3146   This is currently hard-coded for the two-drive layout.
3147
3148   """
3149   # Required free disk space as a function of disk and swap space
3150   req_size_dict = {
3151     constants.DT_DISKLESS: None,
3152     constants.DT_PLAIN: disk_size + swap_size,
3153     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3154     # 256 MB are added for drbd metadata, 128MB for each drbd device
3155     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3156     constants.DT_DRBD8: disk_size + swap_size + 256,
3157   }
3158
3159   if disk_template not in req_size_dict:
3160     raise errors.ProgrammerError("Disk template '%s' size requirement"
3161                                  " is unknown" %  disk_template)
3162
3163   return req_size_dict[disk_template]
3164
3165
3166 class LUCreateInstance(LogicalUnit):
3167   """Create an instance.
3168
3169   """
3170   HPATH = "instance-add"
3171   HTYPE = constants.HTYPE_INSTANCE
3172   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3173               "disk_template", "swap_size", "mode", "start", "vcpus",
3174               "wait_for_sync", "ip_check", "mac"]
3175
3176   def _RunAllocator(self):
3177     """Run the allocator based on input opcode.
3178
3179     """
3180     disks = [{"size": self.op.disk_size, "mode": "w"},
3181              {"size": self.op.swap_size, "mode": "w"}]
3182     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3183              "bridge": self.op.bridge}]
3184     ial = IAllocator(self.cfg, self.sstore,
3185                      mode=constants.IALLOCATOR_MODE_ALLOC,
3186                      name=self.op.instance_name,
3187                      disk_template=self.op.disk_template,
3188                      tags=[],
3189                      os=self.op.os_type,
3190                      vcpus=self.op.vcpus,
3191                      mem_size=self.op.mem_size,
3192                      disks=disks,
3193                      nics=nics,
3194                      )
3195
3196     ial.Run(self.op.iallocator)
3197
3198     if not ial.success:
3199       raise errors.OpPrereqError("Can't compute nodes using"
3200                                  " iallocator '%s': %s" % (self.op.iallocator,
3201                                                            ial.info))
3202     if len(ial.nodes) != ial.required_nodes:
3203       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3204                                  " of nodes (%s), required %s" %
3205                                  (len(ial.nodes), ial.required_nodes))
3206     self.op.pnode = ial.nodes[0]
3207     logger.ToStdout("Selected nodes for the instance: %s" %
3208                     (", ".join(ial.nodes),))
3209     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3210                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3211     if ial.required_nodes == 2:
3212       self.op.snode = ial.nodes[1]
3213
3214   def BuildHooksEnv(self):
3215     """Build hooks env.
3216
3217     This runs on master, primary and secondary nodes of the instance.
3218
3219     """
3220     env = {
3221       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3222       "INSTANCE_DISK_SIZE": self.op.disk_size,
3223       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3224       "INSTANCE_ADD_MODE": self.op.mode,
3225       }
3226     if self.op.mode == constants.INSTANCE_IMPORT:
3227       env["INSTANCE_SRC_NODE"] = self.op.src_node
3228       env["INSTANCE_SRC_PATH"] = self.op.src_path
3229       env["INSTANCE_SRC_IMAGE"] = self.src_image
3230
3231     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3232       primary_node=self.op.pnode,
3233       secondary_nodes=self.secondaries,
3234       status=self.instance_status,
3235       os_type=self.op.os_type,
3236       memory=self.op.mem_size,
3237       vcpus=self.op.vcpus,
3238       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3239     ))
3240
3241     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3242           self.secondaries)
3243     return env, nl, nl
3244
3245
3246   def CheckPrereq(self):
3247     """Check prerequisites.
3248
3249     """
3250     # set optional parameters to none if they don't exist
3251     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3252                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3253                  "vnc_bind_address"]:
3254       if not hasattr(self.op, attr):
3255         setattr(self.op, attr, None)
3256
3257     if self.op.mode not in (constants.INSTANCE_CREATE,
3258                             constants.INSTANCE_IMPORT):
3259       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3260                                  self.op.mode)
3261
3262     if self.op.mode == constants.INSTANCE_IMPORT:
3263       src_node = getattr(self.op, "src_node", None)
3264       src_path = getattr(self.op, "src_path", None)
3265       if src_node is None or src_path is None:
3266         raise errors.OpPrereqError("Importing an instance requires source"
3267                                    " node and path options")
3268       src_node_full = self.cfg.ExpandNodeName(src_node)
3269       if src_node_full is None:
3270         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3271       self.op.src_node = src_node = src_node_full
3272
3273       if not os.path.isabs(src_path):
3274         raise errors.OpPrereqError("The source path must be absolute")
3275
3276       export_info = rpc.call_export_info(src_node, src_path)
3277
3278       if not export_info:
3279         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3280
3281       if not export_info.has_section(constants.INISECT_EXP):
3282         raise errors.ProgrammerError("Corrupted export config")
3283
3284       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3285       if (int(ei_version) != constants.EXPORT_VERSION):
3286         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3287                                    (ei_version, constants.EXPORT_VERSION))
3288
3289       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3290         raise errors.OpPrereqError("Can't import instance with more than"
3291                                    " one data disk")
3292
3293       # FIXME: are the old os-es, disk sizes, etc. useful?
3294       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3295       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3296                                                          'disk0_dump'))
3297       self.src_image = diskimage
3298     else: # INSTANCE_CREATE
3299       if getattr(self.op, "os_type", None) is None:
3300         raise errors.OpPrereqError("No guest OS specified")
3301
3302     #### instance parameters check
3303
3304     # disk template and mirror node verification
3305     if self.op.disk_template not in constants.DISK_TEMPLATES:
3306       raise errors.OpPrereqError("Invalid disk template name")
3307
3308     # instance name verification
3309     hostname1 = utils.HostInfo(self.op.instance_name)
3310
3311     self.op.instance_name = instance_name = hostname1.name
3312     instance_list = self.cfg.GetInstanceList()
3313     if instance_name in instance_list:
3314       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3315                                  instance_name)
3316
3317     # ip validity checks
3318     ip = getattr(self.op, "ip", None)
3319     if ip is None or ip.lower() == "none":
3320       inst_ip = None
3321     elif ip.lower() == "auto":
3322       inst_ip = hostname1.ip
3323     else:
3324       if not utils.IsValidIP(ip):
3325         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3326                                    " like a valid IP" % ip)
3327       inst_ip = ip
3328     self.inst_ip = self.op.ip = inst_ip
3329
3330     if self.op.start and not self.op.ip_check:
3331       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3332                                  " adding an instance in start mode")
3333
3334     if self.op.ip_check:
3335       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3336         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3337                                    (hostname1.ip, instance_name))
3338
3339     # MAC address verification
3340     if self.op.mac != "auto":
3341       if not utils.IsValidMac(self.op.mac.lower()):
3342         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3343                                    self.op.mac)
3344
3345     # bridge verification
3346     bridge = getattr(self.op, "bridge", None)
3347     if bridge is None:
3348       self.op.bridge = self.cfg.GetDefBridge()
3349     else:
3350       self.op.bridge = bridge
3351
3352     # boot order verification
3353     if self.op.hvm_boot_order is not None:
3354       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3355         raise errors.OpPrereqError("invalid boot order specified,"
3356                                    " must be one or more of [acdn]")
3357     #### allocator run
3358
3359     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3360       raise errors.OpPrereqError("One and only one of iallocator and primary"
3361                                  " node must be given")
3362
3363     if self.op.iallocator is not None:
3364       self._RunAllocator()
3365
3366     #### node related checks
3367
3368     # check primary node
3369     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3370     if pnode is None:
3371       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3372                                  self.op.pnode)
3373     self.op.pnode = pnode.name
3374     self.pnode = pnode
3375     self.secondaries = []
3376
3377     # mirror node verification
3378     if self.op.disk_template in constants.DTS_NET_MIRROR:
3379       if getattr(self.op, "snode", None) is None:
3380         raise errors.OpPrereqError("The networked disk templates need"
3381                                    " a mirror node")
3382
3383       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3384       if snode_name is None:
3385         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3386                                    self.op.snode)
3387       elif snode_name == pnode.name:
3388         raise errors.OpPrereqError("The secondary node cannot be"
3389                                    " the primary node.")
3390       self.secondaries.append(snode_name)
3391
3392     req_size = _ComputeDiskSize(self.op.disk_template,
3393                                 self.op.disk_size, self.op.swap_size)
3394
3395     # Check lv size requirements
3396     if req_size is not None:
3397       nodenames = [pnode.name] + self.secondaries
3398       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3399       for node in nodenames:
3400         info = nodeinfo.get(node, None)
3401         if not info:
3402           raise errors.OpPrereqError("Cannot get current information"
3403                                      " from node '%s'" % nodeinfo)
3404         vg_free = info.get('vg_free', None)
3405         if not isinstance(vg_free, int):
3406           raise errors.OpPrereqError("Can't compute free disk space on"
3407                                      " node %s" % node)
3408         if req_size > info['vg_free']:
3409           raise errors.OpPrereqError("Not enough disk space on target node %s."
3410                                      " %d MB available, %d MB required" %
3411                                      (node, info['vg_free'], req_size))
3412
3413     # os verification
3414     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3415     if not os_obj:
3416       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3417                                  " primary node"  % self.op.os_type)
3418
3419     if self.op.kernel_path == constants.VALUE_NONE:
3420       raise errors.OpPrereqError("Can't set instance kernel to none")
3421
3422
3423     # bridge check on primary node
3424     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3425       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3426                                  " destination node '%s'" %
3427                                  (self.op.bridge, pnode.name))
3428
3429     # hvm_cdrom_image_path verification
3430     if self.op.hvm_cdrom_image_path is not None:
3431       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3432         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3433                                    " be an absolute path or None, not %s" %
3434                                    self.op.hvm_cdrom_image_path)
3435       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3436         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3437                                    " regular file or a symlink pointing to"
3438                                    " an existing regular file, not %s" %
3439                                    self.op.hvm_cdrom_image_path)
3440
3441     # vnc_bind_address verification
3442     if self.op.vnc_bind_address is not None:
3443       if not utils.IsValidIP(self.op.vnc_bind_address):
3444         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3445                                    " like a valid IP address" %
3446                                    self.op.vnc_bind_address)
3447
3448     if self.op.start:
3449       self.instance_status = 'up'
3450     else:
3451       self.instance_status = 'down'
3452
3453   def Exec(self, feedback_fn):
3454     """Create and add the instance to the cluster.
3455
3456     """
3457     instance = self.op.instance_name
3458     pnode_name = self.pnode.name
3459
3460     if self.op.mac == "auto":
3461       mac_address = self.cfg.GenerateMAC()
3462     else:
3463       mac_address = self.op.mac
3464
3465     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3466     if self.inst_ip is not None:
3467       nic.ip = self.inst_ip
3468
3469     ht_kind = self.sstore.GetHypervisorType()
3470     if ht_kind in constants.HTS_REQ_PORT:
3471       network_port = self.cfg.AllocatePort()
3472     else:
3473       network_port = None
3474
3475     if self.op.vnc_bind_address is None:
3476       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3477
3478     disks = _GenerateDiskTemplate(self.cfg,
3479                                   self.op.disk_template,
3480                                   instance, pnode_name,
3481                                   self.secondaries, self.op.disk_size,
3482                                   self.op.swap_size)
3483
3484     iobj = objects.Instance(name=instance, os=self.op.os_type,
3485                             primary_node=pnode_name,
3486                             memory=self.op.mem_size,
3487                             vcpus=self.op.vcpus,
3488                             nics=[nic], disks=disks,
3489                             disk_template=self.op.disk_template,
3490                             status=self.instance_status,
3491                             network_port=network_port,
3492                             kernel_path=self.op.kernel_path,
3493                             initrd_path=self.op.initrd_path,
3494                             hvm_boot_order=self.op.hvm_boot_order,
3495                             hvm_acpi=self.op.hvm_acpi,
3496                             hvm_pae=self.op.hvm_pae,
3497                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3498                             vnc_bind_address=self.op.vnc_bind_address,
3499                             )
3500
3501     feedback_fn("* creating instance disks...")
3502     if not _CreateDisks(self.cfg, iobj):
3503       _RemoveDisks(iobj, self.cfg)
3504       raise errors.OpExecError("Device creation failed, reverting...")
3505
3506     feedback_fn("adding instance %s to cluster config" % instance)
3507
3508     self.cfg.AddInstance(iobj)
3509
3510     if self.op.wait_for_sync:
3511       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3512     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3513       # make sure the disks are not degraded (still sync-ing is ok)
3514       time.sleep(15)
3515       feedback_fn("* checking mirrors status")
3516       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3517     else:
3518       disk_abort = False
3519
3520     if disk_abort:
3521       _RemoveDisks(iobj, self.cfg)
3522       self.cfg.RemoveInstance(iobj.name)
3523       raise errors.OpExecError("There are some degraded disks for"
3524                                " this instance")
3525
3526     feedback_fn("creating os for instance %s on node %s" %
3527                 (instance, pnode_name))
3528
3529     if iobj.disk_template != constants.DT_DISKLESS:
3530       if self.op.mode == constants.INSTANCE_CREATE:
3531         feedback_fn("* running the instance OS create scripts...")
3532         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3533           raise errors.OpExecError("could not add os for instance %s"
3534                                    " on node %s" %
3535                                    (instance, pnode_name))
3536
3537       elif self.op.mode == constants.INSTANCE_IMPORT:
3538         feedback_fn("* running the instance OS import scripts...")
3539         src_node = self.op.src_node
3540         src_image = self.src_image
3541         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3542                                                 src_node, src_image):
3543           raise errors.OpExecError("Could not import os for instance"
3544                                    " %s on node %s" %
3545                                    (instance, pnode_name))
3546       else:
3547         # also checked in the prereq part
3548         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3549                                      % self.op.mode)
3550
3551     if self.op.start:
3552       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3553       feedback_fn("* starting instance...")
3554       if not rpc.call_instance_start(pnode_name, iobj, None):
3555         raise errors.OpExecError("Could not start instance")
3556
3557
3558 class LUConnectConsole(NoHooksLU):
3559   """Connect to an instance's console.
3560
3561   This is somewhat special in that it returns the command line that
3562   you need to run on the master node in order to connect to the
3563   console.
3564
3565   """
3566   _OP_REQP = ["instance_name"]
3567
3568   def CheckPrereq(self):
3569     """Check prerequisites.
3570
3571     This checks that the instance is in the cluster.
3572
3573     """
3574     instance = self.cfg.GetInstanceInfo(
3575       self.cfg.ExpandInstanceName(self.op.instance_name))
3576     if instance is None:
3577       raise errors.OpPrereqError("Instance '%s' not known" %
3578                                  self.op.instance_name)
3579     self.instance = instance
3580
3581   def Exec(self, feedback_fn):
3582     """Connect to the console of an instance
3583
3584     """
3585     instance = self.instance
3586     node = instance.primary_node
3587
3588     node_insts = rpc.call_instance_list([node])[node]
3589     if node_insts is False:
3590       raise errors.OpExecError("Can't connect to node %s." % node)
3591
3592     if instance.name not in node_insts:
3593       raise errors.OpExecError("Instance %s is not running." % instance.name)
3594
3595     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3596
3597     hyper = hypervisor.GetHypervisor()
3598     console_cmd = hyper.GetShellCommandForConsole(instance)
3599     # build ssh cmdline
3600     argv = ["ssh", "-q", "-t"]
3601     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3602     argv.extend(ssh.BATCH_MODE_OPTS)
3603     argv.append(node)
3604     argv.append(console_cmd)
3605     return "ssh", argv
3606
3607
3608 class LUAddMDDRBDComponent(LogicalUnit):
3609   """Adda new mirror member to an instance's disk.
3610
3611   """
3612   HPATH = "mirror-add"
3613   HTYPE = constants.HTYPE_INSTANCE
3614   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3615
3616   def BuildHooksEnv(self):
3617     """Build hooks env.
3618
3619     This runs on the master, the primary and all the secondaries.
3620
3621     """
3622     env = {
3623       "NEW_SECONDARY": self.op.remote_node,
3624       "DISK_NAME": self.op.disk_name,
3625       }
3626     env.update(_BuildInstanceHookEnvByObject(self.instance))
3627     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3628           self.op.remote_node,] + list(self.instance.secondary_nodes)
3629     return env, nl, nl
3630
3631   def CheckPrereq(self):
3632     """Check prerequisites.
3633
3634     This checks that the instance is in the cluster.
3635
3636     """
3637     instance = self.cfg.GetInstanceInfo(
3638       self.cfg.ExpandInstanceName(self.op.instance_name))
3639     if instance is None:
3640       raise errors.OpPrereqError("Instance '%s' not known" %
3641                                  self.op.instance_name)
3642     self.instance = instance
3643
3644     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3645     if remote_node is None:
3646       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3647     self.remote_node = remote_node
3648
3649     if remote_node == instance.primary_node:
3650       raise errors.OpPrereqError("The specified node is the primary node of"
3651                                  " the instance.")
3652
3653     if instance.disk_template != constants.DT_REMOTE_RAID1:
3654       raise errors.OpPrereqError("Instance's disk layout is not"
3655                                  " remote_raid1.")
3656     for disk in instance.disks:
3657       if disk.iv_name == self.op.disk_name:
3658         break
3659     else:
3660       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3661                                  " instance." % self.op.disk_name)
3662     if len(disk.children) > 1:
3663       raise errors.OpPrereqError("The device already has two slave devices."
3664                                  " This would create a 3-disk raid1 which we"
3665                                  " don't allow.")
3666     self.disk = disk
3667
3668   def Exec(self, feedback_fn):
3669     """Add the mirror component
3670
3671     """
3672     disk = self.disk
3673     instance = self.instance
3674
3675     remote_node = self.remote_node
3676     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3677     names = _GenerateUniqueNames(self.cfg, lv_names)
3678     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3679                                      remote_node, disk.size, names)
3680
3681     logger.Info("adding new mirror component on secondary")
3682     #HARDCODE
3683     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3684                                       new_drbd, False,
3685                                       _GetInstanceInfoText(instance)):
3686       raise errors.OpExecError("Failed to create new component on secondary"
3687                                " node %s" % remote_node)
3688
3689     logger.Info("adding new mirror component on primary")
3690     #HARDCODE
3691     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3692                                     instance, new_drbd,
3693                                     _GetInstanceInfoText(instance)):
3694       # remove secondary dev
3695       self.cfg.SetDiskID(new_drbd, remote_node)
3696       rpc.call_blockdev_remove(remote_node, new_drbd)
3697       raise errors.OpExecError("Failed to create volume on primary")
3698
3699     # the device exists now
3700     # call the primary node to add the mirror to md
3701     logger.Info("adding new mirror component to md")
3702     if not rpc.call_blockdev_addchildren(instance.primary_node,
3703                                          disk, [new_drbd]):
3704       logger.Error("Can't add mirror compoment to md!")
3705       self.cfg.SetDiskID(new_drbd, remote_node)
3706       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3707         logger.Error("Can't rollback on secondary")
3708       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3709       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3710         logger.Error("Can't rollback on primary")
3711       raise errors.OpExecError("Can't add mirror component to md array")
3712
3713     disk.children.append(new_drbd)
3714
3715     self.cfg.AddInstance(instance)
3716
3717     _WaitForSync(self.cfg, instance, self.proc)
3718
3719     return 0
3720
3721
3722 class LURemoveMDDRBDComponent(LogicalUnit):
3723   """Remove a component from a remote_raid1 disk.
3724
3725   """
3726   HPATH = "mirror-remove"
3727   HTYPE = constants.HTYPE_INSTANCE
3728   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3729
3730   def BuildHooksEnv(self):
3731     """Build hooks env.
3732
3733     This runs on the master, the primary and all the secondaries.
3734
3735     """
3736     env = {
3737       "DISK_NAME": self.op.disk_name,
3738       "DISK_ID": self.op.disk_id,
3739       "OLD_SECONDARY": self.old_secondary,
3740       }
3741     env.update(_BuildInstanceHookEnvByObject(self.instance))
3742     nl = [self.sstore.GetMasterNode(),
3743           self.instance.primary_node] + list(self.instance.secondary_nodes)
3744     return env, nl, nl
3745
3746   def CheckPrereq(self):
3747     """Check prerequisites.
3748
3749     This checks that the instance is in the cluster.
3750
3751     """
3752     instance = self.cfg.GetInstanceInfo(
3753       self.cfg.ExpandInstanceName(self.op.instance_name))
3754     if instance is None:
3755       raise errors.OpPrereqError("Instance '%s' not known" %
3756                                  self.op.instance_name)
3757     self.instance = instance
3758
3759     if instance.disk_template != constants.DT_REMOTE_RAID1:
3760       raise errors.OpPrereqError("Instance's disk layout is not"
3761                                  " remote_raid1.")
3762     for disk in instance.disks:
3763       if disk.iv_name == self.op.disk_name:
3764         break
3765     else:
3766       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3767                                  " instance." % self.op.disk_name)
3768     for child in disk.children:
3769       if (child.dev_type == constants.LD_DRBD7 and
3770           child.logical_id[2] == self.op.disk_id):
3771         break
3772     else:
3773       raise errors.OpPrereqError("Can't find the device with this port.")
3774
3775     if len(disk.children) < 2:
3776       raise errors.OpPrereqError("Cannot remove the last component from"
3777                                  " a mirror.")
3778     self.disk = disk
3779     self.child = child
3780     if self.child.logical_id[0] == instance.primary_node:
3781       oid = 1
3782     else:
3783       oid = 0
3784     self.old_secondary = self.child.logical_id[oid]
3785
3786   def Exec(self, feedback_fn):
3787     """Remove the mirror component
3788
3789     """
3790     instance = self.instance
3791     disk = self.disk
3792     child = self.child
3793     logger.Info("remove mirror component")
3794     self.cfg.SetDiskID(disk, instance.primary_node)
3795     if not rpc.call_blockdev_removechildren(instance.primary_node,
3796                                             disk, [child]):
3797       raise errors.OpExecError("Can't remove child from mirror.")
3798
3799     for node in child.logical_id[:2]:
3800       self.cfg.SetDiskID(child, node)
3801       if not rpc.call_blockdev_remove(node, child):
3802         logger.Error("Warning: failed to remove device from node %s,"
3803                      " continuing operation." % node)
3804
3805     disk.children.remove(child)
3806     self.cfg.AddInstance(instance)
3807
3808
3809 class LUReplaceDisks(LogicalUnit):
3810   """Replace the disks of an instance.
3811
3812   """
3813   HPATH = "mirrors-replace"
3814   HTYPE = constants.HTYPE_INSTANCE
3815   _OP_REQP = ["instance_name", "mode", "disks"]
3816
3817   def _RunAllocator(self):
3818     """Compute a new secondary node using an IAllocator.
3819
3820     """
3821     ial = IAllocator(self.cfg, self.sstore,
3822                      mode=constants.IALLOCATOR_MODE_RELOC,
3823                      name=self.op.instance_name,
3824                      relocate_from=[self.sec_node])
3825
3826     ial.Run(self.op.iallocator)
3827
3828     if not ial.success:
3829       raise errors.OpPrereqError("Can't compute nodes using"
3830                                  " iallocator '%s': %s" % (self.op.iallocator,
3831                                                            ial.info))
3832     if len(ial.nodes) != ial.required_nodes:
3833       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3834                                  " of nodes (%s), required %s" %
3835                                  (len(ial.nodes), ial.required_nodes))
3836     self.op.remote_node = ial.nodes[0]
3837     logger.ToStdout("Selected new secondary for the instance: %s" %
3838                     self.op.remote_node)
3839
3840   def BuildHooksEnv(self):
3841     """Build hooks env.
3842
3843     This runs on the master, the primary and all the secondaries.
3844
3845     """
3846     env = {
3847       "MODE": self.op.mode,
3848       "NEW_SECONDARY": self.op.remote_node,
3849       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3850       }
3851     env.update(_BuildInstanceHookEnvByObject(self.instance))
3852     nl = [
3853       self.sstore.GetMasterNode(),
3854       self.instance.primary_node,
3855       ]
3856     if self.op.remote_node is not None:
3857       nl.append(self.op.remote_node)
3858     return env, nl, nl
3859
3860   def CheckPrereq(self):
3861     """Check prerequisites.
3862
3863     This checks that the instance is in the cluster.
3864
3865     """
3866     if not hasattr(self.op, "remote_node"):
3867       self.op.remote_node = None
3868
3869     instance = self.cfg.GetInstanceInfo(
3870       self.cfg.ExpandInstanceName(self.op.instance_name))
3871     if instance is None:
3872       raise errors.OpPrereqError("Instance '%s' not known" %
3873                                  self.op.instance_name)
3874     self.instance = instance
3875     self.op.instance_name = instance.name
3876
3877     if instance.disk_template not in constants.DTS_NET_MIRROR:
3878       raise errors.OpPrereqError("Instance's disk layout is not"
3879                                  " network mirrored.")
3880
3881     if len(instance.secondary_nodes) != 1:
3882       raise errors.OpPrereqError("The instance has a strange layout,"
3883                                  " expected one secondary but found %d" %
3884                                  len(instance.secondary_nodes))
3885
3886     self.sec_node = instance.secondary_nodes[0]
3887
3888     ia_name = getattr(self.op, "iallocator", None)
3889     if ia_name is not None:
3890       if self.op.remote_node is not None:
3891         raise errors.OpPrereqError("Give either the iallocator or the new"
3892                                    " secondary, not both")
3893       self.op.remote_node = self._RunAllocator()
3894
3895     remote_node = self.op.remote_node
3896     if remote_node is not None:
3897       remote_node = self.cfg.ExpandNodeName(remote_node)
3898       if remote_node is None:
3899         raise errors.OpPrereqError("Node '%s' not known" %
3900                                    self.op.remote_node)
3901       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3902     else:
3903       self.remote_node_info = None
3904     if remote_node == instance.primary_node:
3905       raise errors.OpPrereqError("The specified node is the primary node of"
3906                                  " the instance.")
3907     elif remote_node == self.sec_node:
3908       if self.op.mode == constants.REPLACE_DISK_SEC:
3909         # this is for DRBD8, where we can't execute the same mode of
3910         # replacement as for drbd7 (no different port allocated)
3911         raise errors.OpPrereqError("Same secondary given, cannot execute"
3912                                    " replacement")
3913       # the user gave the current secondary, switch to
3914       # 'no-replace-secondary' mode for drbd7
3915       remote_node = None
3916     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3917         self.op.mode != constants.REPLACE_DISK_ALL):
3918       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3919                                  " disks replacement, not individual ones")
3920     if instance.disk_template == constants.DT_DRBD8:
3921       if (self.op.mode == constants.REPLACE_DISK_ALL and
3922           remote_node is not None):
3923         # switch to replace secondary mode
3924         self.op.mode = constants.REPLACE_DISK_SEC
3925
3926       if self.op.mode == constants.REPLACE_DISK_ALL:
3927         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3928                                    " secondary disk replacement, not"
3929                                    " both at once")
3930       elif self.op.mode == constants.REPLACE_DISK_PRI:
3931         if remote_node is not None:
3932           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3933                                      " the secondary while doing a primary"
3934                                      " node disk replacement")
3935         self.tgt_node = instance.primary_node
3936         self.oth_node = instance.secondary_nodes[0]
3937       elif self.op.mode == constants.REPLACE_DISK_SEC:
3938         self.new_node = remote_node # this can be None, in which case
3939                                     # we don't change the secondary
3940         self.tgt_node = instance.secondary_nodes[0]
3941         self.oth_node = instance.primary_node
3942       else:
3943         raise errors.ProgrammerError("Unhandled disk replace mode")
3944
3945     for name in self.op.disks:
3946       if instance.FindDisk(name) is None:
3947         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3948                                    (name, instance.name))
3949     self.op.remote_node = remote_node
3950
3951   def _ExecRR1(self, feedback_fn):
3952     """Replace the disks of an instance.
3953
3954     """
3955     instance = self.instance
3956     iv_names = {}
3957     # start of work
3958     if self.op.remote_node is None:
3959       remote_node = self.sec_node
3960     else:
3961       remote_node = self.op.remote_node
3962     cfg = self.cfg
3963     for dev in instance.disks:
3964       size = dev.size
3965       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3966       names = _GenerateUniqueNames(cfg, lv_names)
3967       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3968                                        remote_node, size, names)
3969       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3970       logger.Info("adding new mirror component on secondary for %s" %
3971                   dev.iv_name)
3972       #HARDCODE
3973       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3974                                         new_drbd, False,
3975                                         _GetInstanceInfoText(instance)):
3976         raise errors.OpExecError("Failed to create new component on secondary"
3977                                  " node %s. Full abort, cleanup manually!" %
3978                                  remote_node)
3979
3980       logger.Info("adding new mirror component on primary")
3981       #HARDCODE
3982       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3983                                       instance, new_drbd,
3984                                       _GetInstanceInfoText(instance)):
3985         # remove secondary dev
3986         cfg.SetDiskID(new_drbd, remote_node)
3987         rpc.call_blockdev_remove(remote_node, new_drbd)
3988         raise errors.OpExecError("Failed to create volume on primary!"
3989                                  " Full abort, cleanup manually!!")
3990
3991       # the device exists now
3992       # call the primary node to add the mirror to md
3993       logger.Info("adding new mirror component to md")
3994       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3995                                            [new_drbd]):
3996         logger.Error("Can't add mirror compoment to md!")
3997         cfg.SetDiskID(new_drbd, remote_node)
3998         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3999           logger.Error("Can't rollback on secondary")
4000         cfg.SetDiskID(new_drbd, instance.primary_node)
4001         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4002           logger.Error("Can't rollback on primary")
4003         raise errors.OpExecError("Full abort, cleanup manually!!")
4004
4005       dev.children.append(new_drbd)
4006       cfg.AddInstance(instance)
4007
4008     # this can fail as the old devices are degraded and _WaitForSync
4009     # does a combined result over all disks, so we don't check its
4010     # return value
4011     _WaitForSync(cfg, instance, self.proc, unlock=True)
4012
4013     # so check manually all the devices
4014     for name in iv_names:
4015       dev, child, new_drbd = iv_names[name]
4016       cfg.SetDiskID(dev, instance.primary_node)
4017       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4018       if is_degr:
4019         raise errors.OpExecError("MD device %s is degraded!" % name)
4020       cfg.SetDiskID(new_drbd, instance.primary_node)
4021       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4022       if is_degr:
4023         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4024
4025     for name in iv_names:
4026       dev, child, new_drbd = iv_names[name]
4027       logger.Info("remove mirror %s component" % name)
4028       cfg.SetDiskID(dev, instance.primary_node)
4029       if not rpc.call_blockdev_removechildren(instance.primary_node,
4030                                               dev, [child]):
4031         logger.Error("Can't remove child from mirror, aborting"
4032                      " *this device cleanup*.\nYou need to cleanup manually!!")
4033         continue
4034
4035       for node in child.logical_id[:2]:
4036         logger.Info("remove child device on %s" % node)
4037         cfg.SetDiskID(child, node)
4038         if not rpc.call_blockdev_remove(node, child):
4039           logger.Error("Warning: failed to remove device from node %s,"
4040                        " continuing operation." % node)
4041
4042       dev.children.remove(child)
4043
4044       cfg.AddInstance(instance)
4045
4046   def _ExecD8DiskOnly(self, feedback_fn):
4047     """Replace a disk on the primary or secondary for dbrd8.
4048
4049     The algorithm for replace is quite complicated:
4050       - for each disk to be replaced:
4051         - create new LVs on the target node with unique names
4052         - detach old LVs from the drbd device
4053         - rename old LVs to name_replaced.<time_t>
4054         - rename new LVs to old LVs
4055         - attach the new LVs (with the old names now) to the drbd device
4056       - wait for sync across all devices
4057       - for each modified disk:
4058         - remove old LVs (which have the name name_replaces.<time_t>)
4059
4060     Failures are not very well handled.
4061
4062     """
4063     steps_total = 6
4064     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4065     instance = self.instance
4066     iv_names = {}
4067     vgname = self.cfg.GetVGName()
4068     # start of work
4069     cfg = self.cfg
4070     tgt_node = self.tgt_node
4071     oth_node = self.oth_node
4072
4073     # Step: check device activation
4074     self.proc.LogStep(1, steps_total, "check device existence")
4075     info("checking volume groups")
4076     my_vg = cfg.GetVGName()
4077     results = rpc.call_vg_list([oth_node, tgt_node])
4078     if not results:
4079       raise errors.OpExecError("Can't list volume groups on the nodes")
4080     for node in oth_node, tgt_node:
4081       res = results.get(node, False)
4082       if not res or my_vg not in res:
4083         raise errors.OpExecError("Volume group '%s' not found on %s" %
4084                                  (my_vg, node))
4085     for dev in instance.disks:
4086       if not dev.iv_name in self.op.disks:
4087         continue
4088       for node in tgt_node, oth_node:
4089         info("checking %s on %s" % (dev.iv_name, node))
4090         cfg.SetDiskID(dev, node)
4091         if not rpc.call_blockdev_find(node, dev):
4092           raise errors.OpExecError("Can't find device %s on node %s" %
4093                                    (dev.iv_name, node))
4094
4095     # Step: check other node consistency
4096     self.proc.LogStep(2, steps_total, "check peer consistency")
4097     for dev in instance.disks:
4098       if not dev.iv_name in self.op.disks:
4099         continue
4100       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4101       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4102                                    oth_node==instance.primary_node):
4103         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4104                                  " to replace disks on this node (%s)" %
4105                                  (oth_node, tgt_node))
4106
4107     # Step: create new storage
4108     self.proc.LogStep(3, steps_total, "allocate new storage")
4109     for dev in instance.disks:
4110       if not dev.iv_name in self.op.disks:
4111         continue
4112       size = dev.size
4113       cfg.SetDiskID(dev, tgt_node)
4114       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4115       names = _GenerateUniqueNames(cfg, lv_names)
4116       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4117                              logical_id=(vgname, names[0]))
4118       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4119                              logical_id=(vgname, names[1]))
4120       new_lvs = [lv_data, lv_meta]
4121       old_lvs = dev.children
4122       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4123       info("creating new local storage on %s for %s" %
4124            (tgt_node, dev.iv_name))
4125       # since we *always* want to create this LV, we use the
4126       # _Create...OnPrimary (which forces the creation), even if we
4127       # are talking about the secondary node
4128       for new_lv in new_lvs:
4129         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4130                                         _GetInstanceInfoText(instance)):
4131           raise errors.OpExecError("Failed to create new LV named '%s' on"
4132                                    " node '%s'" %
4133                                    (new_lv.logical_id[1], tgt_node))
4134
4135     # Step: for each lv, detach+rename*2+attach
4136     self.proc.LogStep(4, steps_total, "change drbd configuration")
4137     for dev, old_lvs, new_lvs in iv_names.itervalues():
4138       info("detaching %s drbd from local storage" % dev.iv_name)
4139       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4140         raise errors.OpExecError("Can't detach drbd from local storage on node"
4141                                  " %s for device %s" % (tgt_node, dev.iv_name))
4142       #dev.children = []
4143       #cfg.Update(instance)
4144
4145       # ok, we created the new LVs, so now we know we have the needed
4146       # storage; as such, we proceed on the target node to rename
4147       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4148       # using the assumption that logical_id == physical_id (which in
4149       # turn is the unique_id on that node)
4150
4151       # FIXME(iustin): use a better name for the replaced LVs
4152       temp_suffix = int(time.time())
4153       ren_fn = lambda d, suff: (d.physical_id[0],
4154                                 d.physical_id[1] + "_replaced-%s" % suff)
4155       # build the rename list based on what LVs exist on the node
4156       rlist = []
4157       for to_ren in old_lvs:
4158         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4159         if find_res is not None: # device exists
4160           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4161
4162       info("renaming the old LVs on the target node")
4163       if not rpc.call_blockdev_rename(tgt_node, rlist):
4164         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4165       # now we rename the new LVs to the old LVs
4166       info("renaming the new LVs on the target node")
4167       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4168       if not rpc.call_blockdev_rename(tgt_node, rlist):
4169         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4170
4171       for old, new in zip(old_lvs, new_lvs):
4172         new.logical_id = old.logical_id
4173         cfg.SetDiskID(new, tgt_node)
4174
4175       for disk in old_lvs:
4176         disk.logical_id = ren_fn(disk, temp_suffix)
4177         cfg.SetDiskID(disk, tgt_node)
4178
4179       # now that the new lvs have the old name, we can add them to the device
4180       info("adding new mirror component on %s" % tgt_node)
4181       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4182         for new_lv in new_lvs:
4183           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4184             warning("Can't rollback device %s", hint="manually cleanup unused"
4185                     " logical volumes")
4186         raise errors.OpExecError("Can't add local storage to drbd")
4187
4188       dev.children = new_lvs
4189       cfg.Update(instance)
4190
4191     # Step: wait for sync
4192
4193     # this can fail as the old devices are degraded and _WaitForSync
4194     # does a combined result over all disks, so we don't check its
4195     # return value
4196     self.proc.LogStep(5, steps_total, "sync devices")
4197     _WaitForSync(cfg, instance, self.proc, unlock=True)
4198
4199     # so check manually all the devices
4200     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4201       cfg.SetDiskID(dev, instance.primary_node)
4202       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4203       if is_degr:
4204         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4205
4206     # Step: remove old storage
4207     self.proc.LogStep(6, steps_total, "removing old storage")
4208     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4209       info("remove logical volumes for %s" % name)
4210       for lv in old_lvs:
4211         cfg.SetDiskID(lv, tgt_node)
4212         if not rpc.call_blockdev_remove(tgt_node, lv):
4213           warning("Can't remove old LV", hint="manually remove unused LVs")
4214           continue
4215
4216   def _ExecD8Secondary(self, feedback_fn):
4217     """Replace the secondary node for drbd8.
4218
4219     The algorithm for replace is quite complicated:
4220       - for all disks of the instance:
4221         - create new LVs on the new node with same names
4222         - shutdown the drbd device on the old secondary
4223         - disconnect the drbd network on the primary
4224         - create the drbd device on the new secondary
4225         - network attach the drbd on the primary, using an artifice:
4226           the drbd code for Attach() will connect to the network if it
4227           finds a device which is connected to the good local disks but
4228           not network enabled
4229       - wait for sync across all devices
4230       - remove all disks from the old secondary
4231
4232     Failures are not very well handled.
4233
4234     """
4235     steps_total = 6
4236     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4237     instance = self.instance
4238     iv_names = {}
4239     vgname = self.cfg.GetVGName()
4240     # start of work
4241     cfg = self.cfg
4242     old_node = self.tgt_node
4243     new_node = self.new_node
4244     pri_node = instance.primary_node
4245
4246     # Step: check device activation
4247     self.proc.LogStep(1, steps_total, "check device existence")
4248     info("checking volume groups")
4249     my_vg = cfg.GetVGName()
4250     results = rpc.call_vg_list([pri_node, new_node])
4251     if not results:
4252       raise errors.OpExecError("Can't list volume groups on the nodes")
4253     for node in pri_node, new_node:
4254       res = results.get(node, False)
4255       if not res or my_vg not in res:
4256         raise errors.OpExecError("Volume group '%s' not found on %s" %
4257                                  (my_vg, node))
4258     for dev in instance.disks:
4259       if not dev.iv_name in self.op.disks:
4260         continue
4261       info("checking %s on %s" % (dev.iv_name, pri_node))
4262       cfg.SetDiskID(dev, pri_node)
4263       if not rpc.call_blockdev_find(pri_node, dev):
4264         raise errors.OpExecError("Can't find device %s on node %s" %
4265                                  (dev.iv_name, pri_node))
4266
4267     # Step: check other node consistency
4268     self.proc.LogStep(2, steps_total, "check peer consistency")
4269     for dev in instance.disks:
4270       if not dev.iv_name in self.op.disks:
4271         continue
4272       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4273       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4274         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4275                                  " unsafe to replace the secondary" %
4276                                  pri_node)
4277
4278     # Step: create new storage
4279     self.proc.LogStep(3, steps_total, "allocate new storage")
4280     for dev in instance.disks:
4281       size = dev.size
4282       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4283       # since we *always* want to create this LV, we use the
4284       # _Create...OnPrimary (which forces the creation), even if we
4285       # are talking about the secondary node
4286       for new_lv in dev.children:
4287         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4288                                         _GetInstanceInfoText(instance)):
4289           raise errors.OpExecError("Failed to create new LV named '%s' on"
4290                                    " node '%s'" %
4291                                    (new_lv.logical_id[1], new_node))
4292
4293       iv_names[dev.iv_name] = (dev, dev.children)
4294
4295     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4296     for dev in instance.disks:
4297       size = dev.size
4298       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4299       # create new devices on new_node
4300       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4301                               logical_id=(pri_node, new_node,
4302                                           dev.logical_id[2]),
4303                               children=dev.children)
4304       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4305                                         new_drbd, False,
4306                                       _GetInstanceInfoText(instance)):
4307         raise errors.OpExecError("Failed to create new DRBD on"
4308                                  " node '%s'" % new_node)
4309
4310     for dev in instance.disks:
4311       # we have new devices, shutdown the drbd on the old secondary
4312       info("shutting down drbd for %s on old node" % dev.iv_name)
4313       cfg.SetDiskID(dev, old_node)
4314       if not rpc.call_blockdev_shutdown(old_node, dev):
4315         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4316                 hint="Please cleanup this device manually as soon as possible")
4317
4318     info("detaching primary drbds from the network (=> standalone)")
4319     done = 0
4320     for dev in instance.disks:
4321       cfg.SetDiskID(dev, pri_node)
4322       # set the physical (unique in bdev terms) id to None, meaning
4323       # detach from network
4324       dev.physical_id = (None,) * len(dev.physical_id)
4325       # and 'find' the device, which will 'fix' it to match the
4326       # standalone state
4327       if rpc.call_blockdev_find(pri_node, dev):
4328         done += 1
4329       else:
4330         warning("Failed to detach drbd %s from network, unusual case" %
4331                 dev.iv_name)
4332
4333     if not done:
4334       # no detaches succeeded (very unlikely)
4335       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4336
4337     # if we managed to detach at least one, we update all the disks of
4338     # the instance to point to the new secondary
4339     info("updating instance configuration")
4340     for dev in instance.disks:
4341       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4342       cfg.SetDiskID(dev, pri_node)
4343     cfg.Update(instance)
4344
4345     # and now perform the drbd attach
4346     info("attaching primary drbds to new secondary (standalone => connected)")
4347     failures = []
4348     for dev in instance.disks:
4349       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4350       # since the attach is smart, it's enough to 'find' the device,
4351       # it will automatically activate the network, if the physical_id
4352       # is correct
4353       cfg.SetDiskID(dev, pri_node)
4354       if not rpc.call_blockdev_find(pri_node, dev):
4355         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4356                 "please do a gnt-instance info to see the status of disks")
4357
4358     # this can fail as the old devices are degraded and _WaitForSync
4359     # does a combined result over all disks, so we don't check its
4360     # return value
4361     self.proc.LogStep(5, steps_total, "sync devices")
4362     _WaitForSync(cfg, instance, self.proc, unlock=True)
4363
4364     # so check manually all the devices
4365     for name, (dev, old_lvs) in iv_names.iteritems():
4366       cfg.SetDiskID(dev, pri_node)
4367       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4368       if is_degr:
4369         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4370
4371     self.proc.LogStep(6, steps_total, "removing old storage")
4372     for name, (dev, old_lvs) in iv_names.iteritems():
4373       info("remove logical volumes for %s" % name)
4374       for lv in old_lvs:
4375         cfg.SetDiskID(lv, old_node)
4376         if not rpc.call_blockdev_remove(old_node, lv):
4377           warning("Can't remove LV on old secondary",
4378                   hint="Cleanup stale volumes by hand")
4379
4380   def Exec(self, feedback_fn):
4381     """Execute disk replacement.
4382
4383     This dispatches the disk replacement to the appropriate handler.
4384
4385     """
4386     instance = self.instance
4387     if instance.disk_template == constants.DT_REMOTE_RAID1:
4388       fn = self._ExecRR1
4389     elif instance.disk_template == constants.DT_DRBD8:
4390       if self.op.remote_node is None:
4391         fn = self._ExecD8DiskOnly
4392       else:
4393         fn = self._ExecD8Secondary
4394     else:
4395       raise errors.ProgrammerError("Unhandled disk replacement case")
4396     return fn(feedback_fn)
4397
4398
4399 class LUQueryInstanceData(NoHooksLU):
4400   """Query runtime instance data.
4401
4402   """
4403   _OP_REQP = ["instances"]
4404
4405   def CheckPrereq(self):
4406     """Check prerequisites.
4407
4408     This only checks the optional instance list against the existing names.
4409
4410     """
4411     if not isinstance(self.op.instances, list):
4412       raise errors.OpPrereqError("Invalid argument type 'instances'")
4413     if self.op.instances:
4414       self.wanted_instances = []
4415       names = self.op.instances
4416       for name in names:
4417         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4418         if instance is None:
4419           raise errors.OpPrereqError("No such instance name '%s'" % name)
4420         self.wanted_instances.append(instance)
4421     else:
4422       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4423                                in self.cfg.GetInstanceList()]
4424     return
4425
4426
4427   def _ComputeDiskStatus(self, instance, snode, dev):
4428     """Compute block device status.
4429
4430     """
4431     self.cfg.SetDiskID(dev, instance.primary_node)
4432     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4433     if dev.dev_type in constants.LDS_DRBD:
4434       # we change the snode then (otherwise we use the one passed in)
4435       if dev.logical_id[0] == instance.primary_node:
4436         snode = dev.logical_id[1]
4437       else:
4438         snode = dev.logical_id[0]
4439
4440     if snode:
4441       self.cfg.SetDiskID(dev, snode)
4442       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4443     else:
4444       dev_sstatus = None
4445
4446     if dev.children:
4447       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4448                       for child in dev.children]
4449     else:
4450       dev_children = []
4451
4452     data = {
4453       "iv_name": dev.iv_name,
4454       "dev_type": dev.dev_type,
4455       "logical_id": dev.logical_id,
4456       "physical_id": dev.physical_id,
4457       "pstatus": dev_pstatus,
4458       "sstatus": dev_sstatus,
4459       "children": dev_children,
4460       }
4461
4462     return data
4463
4464   def Exec(self, feedback_fn):
4465     """Gather and return data"""
4466     result = {}
4467     for instance in self.wanted_instances:
4468       remote_info = rpc.call_instance_info(instance.primary_node,
4469                                                 instance.name)
4470       if remote_info and "state" in remote_info:
4471         remote_state = "up"
4472       else:
4473         remote_state = "down"
4474       if instance.status == "down":
4475         config_state = "down"
4476       else:
4477         config_state = "up"
4478
4479       disks = [self._ComputeDiskStatus(instance, None, device)
4480                for device in instance.disks]
4481
4482       idict = {
4483         "name": instance.name,
4484         "config_state": config_state,
4485         "run_state": remote_state,
4486         "pnode": instance.primary_node,
4487         "snodes": instance.secondary_nodes,
4488         "os": instance.os,
4489         "memory": instance.memory,
4490         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4491         "disks": disks,
4492         "vcpus": instance.vcpus,
4493         }
4494
4495       htkind = self.sstore.GetHypervisorType()
4496       if htkind == constants.HT_XEN_PVM30:
4497         idict["kernel_path"] = instance.kernel_path
4498         idict["initrd_path"] = instance.initrd_path
4499
4500       if htkind == constants.HT_XEN_HVM31:
4501         idict["hvm_boot_order"] = instance.hvm_boot_order
4502         idict["hvm_acpi"] = instance.hvm_acpi
4503         idict["hvm_pae"] = instance.hvm_pae
4504         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4505
4506       if htkind in constants.HTS_REQ_PORT:
4507         idict["vnc_bind_address"] = instance.vnc_bind_address
4508         idict["network_port"] = instance.network_port
4509
4510       result[instance.name] = idict
4511
4512     return result
4513
4514
4515 class LUSetInstanceParms(LogicalUnit):
4516   """Modifies an instances's parameters.
4517
4518   """
4519   HPATH = "instance-modify"
4520   HTYPE = constants.HTYPE_INSTANCE
4521   _OP_REQP = ["instance_name"]
4522
4523   def BuildHooksEnv(self):
4524     """Build hooks env.
4525
4526     This runs on the master, primary and secondaries.
4527
4528     """
4529     args = dict()
4530     if self.mem:
4531       args['memory'] = self.mem
4532     if self.vcpus:
4533       args['vcpus'] = self.vcpus
4534     if self.do_ip or self.do_bridge or self.mac:
4535       if self.do_ip:
4536         ip = self.ip
4537       else:
4538         ip = self.instance.nics[0].ip
4539       if self.bridge:
4540         bridge = self.bridge
4541       else:
4542         bridge = self.instance.nics[0].bridge
4543       if self.mac:
4544         mac = self.mac
4545       else:
4546         mac = self.instance.nics[0].mac
4547       args['nics'] = [(ip, bridge, mac)]
4548     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4549     nl = [self.sstore.GetMasterNode(),
4550           self.instance.primary_node] + list(self.instance.secondary_nodes)
4551     return env, nl, nl
4552
4553   def CheckPrereq(self):
4554     """Check prerequisites.
4555
4556     This only checks the instance list against the existing names.
4557
4558     """
4559     self.mem = getattr(self.op, "mem", None)
4560     self.vcpus = getattr(self.op, "vcpus", None)
4561     self.ip = getattr(self.op, "ip", None)
4562     self.mac = getattr(self.op, "mac", None)
4563     self.bridge = getattr(self.op, "bridge", None)
4564     self.kernel_path = getattr(self.op, "kernel_path", None)
4565     self.initrd_path = getattr(self.op, "initrd_path", None)
4566     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4567     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4568     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4569     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4570     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4571     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4572                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4573                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4574                  self.vnc_bind_address]
4575     if all_parms.count(None) == len(all_parms):
4576       raise errors.OpPrereqError("No changes submitted")
4577     if self.mem is not None:
4578       try:
4579         self.mem = int(self.mem)
4580       except ValueError, err:
4581         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4582     if self.vcpus is not None:
4583       try:
4584         self.vcpus = int(self.vcpus)
4585       except ValueError, err:
4586         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4587     if self.ip is not None:
4588       self.do_ip = True
4589       if self.ip.lower() == "none":
4590         self.ip = None
4591       else:
4592         if not utils.IsValidIP(self.ip):
4593           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4594     else:
4595       self.do_ip = False
4596     self.do_bridge = (self.bridge is not None)
4597     if self.mac is not None:
4598       if self.cfg.IsMacInUse(self.mac):
4599         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4600                                    self.mac)
4601       if not utils.IsValidMac(self.mac):
4602         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4603
4604     if self.kernel_path is not None:
4605       self.do_kernel_path = True
4606       if self.kernel_path == constants.VALUE_NONE:
4607         raise errors.OpPrereqError("Can't set instance to no kernel")
4608
4609       if self.kernel_path != constants.VALUE_DEFAULT:
4610         if not os.path.isabs(self.kernel_path):
4611           raise errors.OpPrereqError("The kernel path must be an absolute"
4612                                     " filename")
4613     else:
4614       self.do_kernel_path = False
4615
4616     if self.initrd_path is not None:
4617       self.do_initrd_path = True
4618       if self.initrd_path not in (constants.VALUE_NONE,
4619                                   constants.VALUE_DEFAULT):
4620         if not os.path.isabs(self.initrd_path):
4621           raise errors.OpPrereqError("The initrd path must be an absolute"
4622                                     " filename")
4623     else:
4624       self.do_initrd_path = False
4625
4626     # boot order verification
4627     if self.hvm_boot_order is not None:
4628       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4629         if len(self.hvm_boot_order.strip("acdn")) != 0:
4630           raise errors.OpPrereqError("invalid boot order specified,"
4631                                      " must be one or more of [acdn]"
4632                                      " or 'default'")
4633
4634     # hvm_cdrom_image_path verification
4635     if self.op.hvm_cdrom_image_path is not None:
4636       if not os.path.isabs(self.op.hvm_cdrom_image_path):
4637         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4638                                    " be an absolute path or None, not %s" %
4639                                    self.op.hvm_cdrom_image_path)
4640       if not os.path.isfile(self.op.hvm_cdrom_image_path):
4641         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4642                                    " regular file or a symlink pointing to"
4643                                    " an existing regular file, not %s" %
4644                                    self.op.hvm_cdrom_image_path)
4645
4646     # vnc_bind_address verification
4647     if self.op.vnc_bind_address is not None:
4648       if not utils.IsValidIP(self.op.vnc_bind_address):
4649         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4650                                    " like a valid IP address" %
4651                                    self.op.vnc_bind_address)
4652
4653     instance = self.cfg.GetInstanceInfo(
4654       self.cfg.ExpandInstanceName(self.op.instance_name))
4655     if instance is None:
4656       raise errors.OpPrereqError("No such instance name '%s'" %
4657                                  self.op.instance_name)
4658     self.op.instance_name = instance.name
4659     self.instance = instance
4660     return
4661
4662   def Exec(self, feedback_fn):
4663     """Modifies an instance.
4664
4665     All parameters take effect only at the next restart of the instance.
4666     """
4667     result = []
4668     instance = self.instance
4669     if self.mem:
4670       instance.memory = self.mem
4671       result.append(("mem", self.mem))
4672     if self.vcpus:
4673       instance.vcpus = self.vcpus
4674       result.append(("vcpus",  self.vcpus))
4675     if self.do_ip:
4676       instance.nics[0].ip = self.ip
4677       result.append(("ip", self.ip))
4678     if self.bridge:
4679       instance.nics[0].bridge = self.bridge
4680       result.append(("bridge", self.bridge))
4681     if self.mac:
4682       instance.nics[0].mac = self.mac
4683       result.append(("mac", self.mac))
4684     if self.do_kernel_path:
4685       instance.kernel_path = self.kernel_path
4686       result.append(("kernel_path", self.kernel_path))
4687     if self.do_initrd_path:
4688       instance.initrd_path = self.initrd_path
4689       result.append(("initrd_path", self.initrd_path))
4690     if self.hvm_boot_order:
4691       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4692         instance.hvm_boot_order = None
4693       else:
4694         instance.hvm_boot_order = self.hvm_boot_order
4695       result.append(("hvm_boot_order", self.hvm_boot_order))
4696     if self.hvm_acpi:
4697       instance.hvm_acpi = self.hvm_acpi
4698       result.append(("hvm_acpi", self.hvm_acpi))
4699     if self.hvm_pae:
4700       instance.hvm_pae = self.hvm_pae
4701       result.append(("hvm_pae", self.hvm_pae))
4702     if self.hvm_cdrom_image_path:
4703       instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4704       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4705     if self.vnc_bind_address:
4706       instance.vnc_bind_address = self.vnc_bind_address
4707       result.append(("vnc_bind_address", self.vnc_bind_address))
4708
4709     self.cfg.AddInstance(instance)
4710
4711     return result
4712
4713
4714 class LUQueryExports(NoHooksLU):
4715   """Query the exports list
4716
4717   """
4718   _OP_REQP = []
4719
4720   def CheckPrereq(self):
4721     """Check that the nodelist contains only existing nodes.
4722
4723     """
4724     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4725
4726   def Exec(self, feedback_fn):
4727     """Compute the list of all the exported system images.
4728
4729     Returns:
4730       a dictionary with the structure node->(export-list)
4731       where export-list is a list of the instances exported on
4732       that node.
4733
4734     """
4735     return rpc.call_export_list(self.nodes)
4736
4737
4738 class LUExportInstance(LogicalUnit):
4739   """Export an instance to an image in the cluster.
4740
4741   """
4742   HPATH = "instance-export"
4743   HTYPE = constants.HTYPE_INSTANCE
4744   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4745
4746   def BuildHooksEnv(self):
4747     """Build hooks env.
4748
4749     This will run on the master, primary node and target node.
4750
4751     """
4752     env = {
4753       "EXPORT_NODE": self.op.target_node,
4754       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4755       }
4756     env.update(_BuildInstanceHookEnvByObject(self.instance))
4757     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4758           self.op.target_node]
4759     return env, nl, nl
4760
4761   def CheckPrereq(self):
4762     """Check prerequisites.
4763
4764     This checks that the instance and node names are valid.
4765
4766     """
4767     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4768     self.instance = self.cfg.GetInstanceInfo(instance_name)
4769     if self.instance is None:
4770       raise errors.OpPrereqError("Instance '%s' not found" %
4771                                  self.op.instance_name)
4772
4773     # node verification
4774     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4775     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4776
4777     if self.dst_node is None:
4778       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4779                                  self.op.target_node)
4780     self.op.target_node = self.dst_node.name
4781
4782   def Exec(self, feedback_fn):
4783     """Export an instance to an image in the cluster.
4784
4785     """
4786     instance = self.instance
4787     dst_node = self.dst_node
4788     src_node = instance.primary_node
4789     if self.op.shutdown:
4790       # shutdown the instance, but not the disks
4791       if not rpc.call_instance_shutdown(src_node, instance):
4792         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4793                                  (instance.name, src_node))
4794
4795     vgname = self.cfg.GetVGName()
4796
4797     snap_disks = []
4798
4799     try:
4800       for disk in instance.disks:
4801         if disk.iv_name == "sda":
4802           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4803           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4804
4805           if not new_dev_name:
4806             logger.Error("could not snapshot block device %s on node %s" %
4807                          (disk.logical_id[1], src_node))
4808           else:
4809             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4810                                       logical_id=(vgname, new_dev_name),
4811                                       physical_id=(vgname, new_dev_name),
4812                                       iv_name=disk.iv_name)
4813             snap_disks.append(new_dev)
4814
4815     finally:
4816       if self.op.shutdown and instance.status == "up":
4817         if not rpc.call_instance_start(src_node, instance, None):
4818           _ShutdownInstanceDisks(instance, self.cfg)
4819           raise errors.OpExecError("Could not start instance")
4820
4821     # TODO: check for size
4822
4823     for dev in snap_disks:
4824       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4825                                            instance):
4826         logger.Error("could not export block device %s from node"
4827                      " %s to node %s" %
4828                      (dev.logical_id[1], src_node, dst_node.name))
4829       if not rpc.call_blockdev_remove(src_node, dev):
4830         logger.Error("could not remove snapshot block device %s from"
4831                      " node %s" % (dev.logical_id[1], src_node))
4832
4833     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4834       logger.Error("could not finalize export for instance %s on node %s" %
4835                    (instance.name, dst_node.name))
4836
4837     nodelist = self.cfg.GetNodeList()
4838     nodelist.remove(dst_node.name)
4839
4840     # on one-node clusters nodelist will be empty after the removal
4841     # if we proceed the backup would be removed because OpQueryExports
4842     # substitutes an empty list with the full cluster node list.
4843     if nodelist:
4844       op = opcodes.OpQueryExports(nodes=nodelist)
4845       exportlist = self.proc.ChainOpCode(op)
4846       for node in exportlist:
4847         if instance.name in exportlist[node]:
4848           if not rpc.call_export_remove(node, instance.name):
4849             logger.Error("could not remove older export for instance %s"
4850                          " on node %s" % (instance.name, node))
4851
4852
4853 class LURemoveExport(NoHooksLU):
4854   """Remove exports related to the named instance.
4855
4856   """
4857   _OP_REQP = ["instance_name"]
4858
4859   def CheckPrereq(self):
4860     """Check prerequisites.
4861     """
4862     pass
4863
4864   def Exec(self, feedback_fn):
4865     """Remove any export.
4866
4867     """
4868     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4869     # If the instance was not found we'll try with the name that was passed in.
4870     # This will only work if it was an FQDN, though.
4871     fqdn_warn = False
4872     if not instance_name:
4873       fqdn_warn = True
4874       instance_name = self.op.instance_name
4875
4876     op = opcodes.OpQueryExports(nodes=[])
4877     exportlist = self.proc.ChainOpCode(op)
4878     found = False
4879     for node in exportlist:
4880       if instance_name in exportlist[node]:
4881         found = True
4882         if not rpc.call_export_remove(node, instance_name):
4883           logger.Error("could not remove export for instance %s"
4884                        " on node %s" % (instance_name, node))
4885
4886     if fqdn_warn and not found:
4887       feedback_fn("Export not found. If trying to remove an export belonging"
4888                   " to a deleted instance please use its Fully Qualified"
4889                   " Domain Name.")
4890
4891
4892 class TagsLU(NoHooksLU):
4893   """Generic tags LU.
4894
4895   This is an abstract class which is the parent of all the other tags LUs.
4896
4897   """
4898   def CheckPrereq(self):
4899     """Check prerequisites.
4900
4901     """
4902     if self.op.kind == constants.TAG_CLUSTER:
4903       self.target = self.cfg.GetClusterInfo()
4904     elif self.op.kind == constants.TAG_NODE:
4905       name = self.cfg.ExpandNodeName(self.op.name)
4906       if name is None:
4907         raise errors.OpPrereqError("Invalid node name (%s)" %
4908                                    (self.op.name,))
4909       self.op.name = name
4910       self.target = self.cfg.GetNodeInfo(name)
4911     elif self.op.kind == constants.TAG_INSTANCE:
4912       name = self.cfg.ExpandInstanceName(self.op.name)
4913       if name is None:
4914         raise errors.OpPrereqError("Invalid instance name (%s)" %
4915                                    (self.op.name,))
4916       self.op.name = name
4917       self.target = self.cfg.GetInstanceInfo(name)
4918     else:
4919       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4920                                  str(self.op.kind))
4921
4922
4923 class LUGetTags(TagsLU):
4924   """Returns the tags of a given object.
4925
4926   """
4927   _OP_REQP = ["kind", "name"]
4928
4929   def Exec(self, feedback_fn):
4930     """Returns the tag list.
4931
4932     """
4933     return self.target.GetTags()
4934
4935
4936 class LUSearchTags(NoHooksLU):
4937   """Searches the tags for a given pattern.
4938
4939   """
4940   _OP_REQP = ["pattern"]
4941
4942   def CheckPrereq(self):
4943     """Check prerequisites.
4944
4945     This checks the pattern passed for validity by compiling it.
4946
4947     """
4948     try:
4949       self.re = re.compile(self.op.pattern)
4950     except re.error, err:
4951       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4952                                  (self.op.pattern, err))
4953
4954   def Exec(self, feedback_fn):
4955     """Returns the tag list.
4956
4957     """
4958     cfg = self.cfg
4959     tgts = [("/cluster", cfg.GetClusterInfo())]
4960     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4961     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4962     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4963     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4964     results = []
4965     for path, target in tgts:
4966       for tag in target.GetTags():
4967         if self.re.search(tag):
4968           results.append((path, tag))
4969     return results
4970
4971
4972 class LUAddTags(TagsLU):
4973   """Sets a tag on a given object.
4974
4975   """
4976   _OP_REQP = ["kind", "name", "tags"]
4977
4978   def CheckPrereq(self):
4979     """Check prerequisites.
4980
4981     This checks the type and length of the tag name and value.
4982
4983     """
4984     TagsLU.CheckPrereq(self)
4985     for tag in self.op.tags:
4986       objects.TaggableObject.ValidateTag(tag)
4987
4988   def Exec(self, feedback_fn):
4989     """Sets the tag.
4990
4991     """
4992     try:
4993       for tag in self.op.tags:
4994         self.target.AddTag(tag)
4995     except errors.TagError, err:
4996       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4997     try:
4998       self.cfg.Update(self.target)
4999     except errors.ConfigurationError:
5000       raise errors.OpRetryError("There has been a modification to the"
5001                                 " config file and the operation has been"
5002                                 " aborted. Please retry.")
5003
5004
5005 class LUDelTags(TagsLU):
5006   """Delete a list of tags from a given object.
5007
5008   """
5009   _OP_REQP = ["kind", "name", "tags"]
5010
5011   def CheckPrereq(self):
5012     """Check prerequisites.
5013
5014     This checks that we have the given tag.
5015
5016     """
5017     TagsLU.CheckPrereq(self)
5018     for tag in self.op.tags:
5019       objects.TaggableObject.ValidateTag(tag)
5020     del_tags = frozenset(self.op.tags)
5021     cur_tags = self.target.GetTags()
5022     if not del_tags <= cur_tags:
5023       diff_tags = del_tags - cur_tags
5024       diff_names = ["'%s'" % tag for tag in diff_tags]
5025       diff_names.sort()
5026       raise errors.OpPrereqError("Tag(s) %s not found" %
5027                                  (",".join(diff_names)))
5028
5029   def Exec(self, feedback_fn):
5030     """Remove the tag from the object.
5031
5032     """
5033     for tag in self.op.tags:
5034       self.target.RemoveTag(tag)
5035     try:
5036       self.cfg.Update(self.target)
5037     except errors.ConfigurationError:
5038       raise errors.OpRetryError("There has been a modification to the"
5039                                 " config file and the operation has been"
5040                                 " aborted. Please retry.")
5041
5042 class LUTestDelay(NoHooksLU):
5043   """Sleep for a specified amount of time.
5044
5045   This LU sleeps on the master and/or nodes for a specified amoutn of
5046   time.
5047
5048   """
5049   _OP_REQP = ["duration", "on_master", "on_nodes"]
5050
5051   def CheckPrereq(self):
5052     """Check prerequisites.
5053
5054     This checks that we have a good list of nodes and/or the duration
5055     is valid.
5056
5057     """
5058
5059     if self.op.on_nodes:
5060       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5061
5062   def Exec(self, feedback_fn):
5063     """Do the actual sleep.
5064
5065     """
5066     if self.op.on_master:
5067       if not utils.TestDelay(self.op.duration):
5068         raise errors.OpExecError("Error during master delay test")
5069     if self.op.on_nodes:
5070       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5071       if not result:
5072         raise errors.OpExecError("Complete failure from rpc call")
5073       for node, node_result in result.items():
5074         if not node_result:
5075           raise errors.OpExecError("Failure during rpc call to node %s,"
5076                                    " result: %s" % (node, node_result))
5077
5078
5079 class IAllocator(object):
5080   """IAllocator framework.
5081
5082   An IAllocator instance has three sets of attributes:
5083     - cfg/sstore that are needed to query the cluster
5084     - input data (all members of the _KEYS class attribute are required)
5085     - four buffer attributes (in|out_data|text), that represent the
5086       input (to the external script) in text and data structure format,
5087       and the output from it, again in two formats
5088     - the result variables from the script (success, info, nodes) for
5089       easy usage
5090
5091   """
5092   _ALLO_KEYS = [
5093     "mem_size", "disks", "disk_template",
5094     "os", "tags", "nics", "vcpus",
5095     ]
5096   _RELO_KEYS = [
5097     "relocate_from",
5098     ]
5099
5100   def __init__(self, cfg, sstore, mode, name, **kwargs):
5101     self.cfg = cfg
5102     self.sstore = sstore
5103     # init buffer variables
5104     self.in_text = self.out_text = self.in_data = self.out_data = None
5105     # init all input fields so that pylint is happy
5106     self.mode = mode
5107     self.name = name
5108     self.mem_size = self.disks = self.disk_template = None
5109     self.os = self.tags = self.nics = self.vcpus = None
5110     self.relocate_from = None
5111     # computed fields
5112     self.required_nodes = None
5113     # init result fields
5114     self.success = self.info = self.nodes = None
5115     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5116       keyset = self._ALLO_KEYS
5117     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5118       keyset = self._RELO_KEYS
5119     else:
5120       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5121                                    " IAllocator" % self.mode)
5122     for key in kwargs:
5123       if key not in keyset:
5124         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5125                                      " IAllocator" % key)
5126       setattr(self, key, kwargs[key])
5127     for key in keyset:
5128       if key not in kwargs:
5129         raise errors.ProgrammerError("Missing input parameter '%s' to"
5130                                      " IAllocator" % key)
5131     self._BuildInputData()
5132
5133   def _ComputeClusterData(self):
5134     """Compute the generic allocator input data.
5135
5136     This is the data that is independent of the actual operation.
5137
5138     """
5139     cfg = self.cfg
5140     # cluster data
5141     data = {
5142       "version": 1,
5143       "cluster_name": self.sstore.GetClusterName(),
5144       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5145       "hypervisor_type": self.sstore.GetHypervisorType(),
5146       # we don't have job IDs
5147       }
5148
5149     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5150
5151     # node data
5152     node_results = {}
5153     node_list = cfg.GetNodeList()
5154     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5155     for nname in node_list:
5156       ninfo = cfg.GetNodeInfo(nname)
5157       if nname not in node_data or not isinstance(node_data[nname], dict):
5158         raise errors.OpExecError("Can't get data for node %s" % nname)
5159       remote_info = node_data[nname]
5160       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5161                    'vg_size', 'vg_free', 'cpu_total']:
5162         if attr not in remote_info:
5163           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5164                                    (nname, attr))
5165         try:
5166           remote_info[attr] = int(remote_info[attr])
5167         except ValueError, err:
5168           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5169                                    " %s" % (nname, attr, str(err)))
5170       # compute memory used by primary instances
5171       i_p_mem = i_p_up_mem = 0
5172       for iinfo in i_list:
5173         if iinfo.primary_node == nname:
5174           i_p_mem += iinfo.memory
5175           if iinfo.status == "up":
5176             i_p_up_mem += iinfo.memory
5177
5178       # compute memory used by instances
5179       pnr = {
5180         "tags": list(ninfo.GetTags()),
5181         "total_memory": remote_info['memory_total'],
5182         "reserved_memory": remote_info['memory_dom0'],
5183         "free_memory": remote_info['memory_free'],
5184         "i_pri_memory": i_p_mem,
5185         "i_pri_up_memory": i_p_up_mem,
5186         "total_disk": remote_info['vg_size'],
5187         "free_disk": remote_info['vg_free'],
5188         "primary_ip": ninfo.primary_ip,
5189         "secondary_ip": ninfo.secondary_ip,
5190         "total_cpus": remote_info['cpu_total'],
5191         }
5192       node_results[nname] = pnr
5193     data["nodes"] = node_results
5194
5195     # instance data
5196     instance_data = {}
5197     for iinfo in i_list:
5198       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5199                   for n in iinfo.nics]
5200       pir = {
5201         "tags": list(iinfo.GetTags()),
5202         "should_run": iinfo.status == "up",
5203         "vcpus": iinfo.vcpus,
5204         "memory": iinfo.memory,
5205         "os": iinfo.os,
5206         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5207         "nics": nic_data,
5208         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5209         "disk_template": iinfo.disk_template,
5210         }
5211       instance_data[iinfo.name] = pir
5212
5213     data["instances"] = instance_data
5214
5215     self.in_data = data
5216
5217   def _AddNewInstance(self):
5218     """Add new instance data to allocator structure.
5219
5220     This in combination with _AllocatorGetClusterData will create the
5221     correct structure needed as input for the allocator.
5222
5223     The checks for the completeness of the opcode must have already been
5224     done.
5225
5226     """
5227     data = self.in_data
5228     if len(self.disks) != 2:
5229       raise errors.OpExecError("Only two-disk configurations supported")
5230
5231     disk_space = _ComputeDiskSize(self.disk_template,
5232                                   self.disks[0]["size"], self.disks[1]["size"])
5233
5234     if self.disk_template in constants.DTS_NET_MIRROR:
5235       self.required_nodes = 2
5236     else:
5237       self.required_nodes = 1
5238     request = {
5239       "type": "allocate",
5240       "name": self.name,
5241       "disk_template": self.disk_template,
5242       "tags": self.tags,
5243       "os": self.os,
5244       "vcpus": self.vcpus,
5245       "memory": self.mem_size,
5246       "disks": self.disks,
5247       "disk_space_total": disk_space,
5248       "nics": self.nics,
5249       "required_nodes": self.required_nodes,
5250       }
5251     data["request"] = request
5252
5253   def _AddRelocateInstance(self):
5254     """Add relocate instance data to allocator structure.
5255
5256     This in combination with _IAllocatorGetClusterData will create the
5257     correct structure needed as input for the allocator.
5258
5259     The checks for the completeness of the opcode must have already been
5260     done.
5261
5262     """
5263     instance = self.cfg.GetInstanceInfo(self.name)
5264     if instance is None:
5265       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5266                                    " IAllocator" % self.name)
5267
5268     if instance.disk_template not in constants.DTS_NET_MIRROR:
5269       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5270
5271     if len(instance.secondary_nodes) != 1:
5272       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5273
5274     self.required_nodes = 1
5275
5276     disk_space = _ComputeDiskSize(instance.disk_template,
5277                                   instance.disks[0].size,
5278                                   instance.disks[1].size)
5279
5280     request = {
5281       "type": "relocate",
5282       "name": self.name,
5283       "disk_space_total": disk_space,
5284       "required_nodes": self.required_nodes,
5285       "relocate_from": self.relocate_from,
5286       }
5287     self.in_data["request"] = request
5288
5289   def _BuildInputData(self):
5290     """Build input data structures.
5291
5292     """
5293     self._ComputeClusterData()
5294
5295     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5296       self._AddNewInstance()
5297     else:
5298       self._AddRelocateInstance()
5299
5300     self.in_text = serializer.Dump(self.in_data)
5301
5302   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5303     """Run an instance allocator and return the results.
5304
5305     """
5306     data = self.in_text
5307
5308     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5309
5310     if not isinstance(result, tuple) or len(result) != 4:
5311       raise errors.OpExecError("Invalid result from master iallocator runner")
5312
5313     rcode, stdout, stderr, fail = result
5314
5315     if rcode == constants.IARUN_NOTFOUND:
5316       raise errors.OpExecError("Can't find allocator '%s'" % name)
5317     elif rcode == constants.IARUN_FAILURE:
5318         raise errors.OpExecError("Instance allocator call failed: %s,"
5319                                  " output: %s" %
5320                                  (fail, stdout+stderr))
5321     self.out_text = stdout
5322     if validate:
5323       self._ValidateResult()
5324
5325   def _ValidateResult(self):
5326     """Process the allocator results.
5327
5328     This will process and if successful save the result in
5329     self.out_data and the other parameters.
5330
5331     """
5332     try:
5333       rdict = serializer.Load(self.out_text)
5334     except Exception, err:
5335       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5336
5337     if not isinstance(rdict, dict):
5338       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5339
5340     for key in "success", "info", "nodes":
5341       if key not in rdict:
5342         raise errors.OpExecError("Can't parse iallocator results:"
5343                                  " missing key '%s'" % key)
5344       setattr(self, key, rdict[key])
5345
5346     if not isinstance(rdict["nodes"], list):
5347       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5348                                " is not a list")
5349     self.out_data = rdict
5350
5351
5352 class LUTestAllocator(NoHooksLU):
5353   """Run allocator tests.
5354
5355   This LU runs the allocator tests
5356
5357   """
5358   _OP_REQP = ["direction", "mode", "name"]
5359
5360   def CheckPrereq(self):
5361     """Check prerequisites.
5362
5363     This checks the opcode parameters depending on the director and mode test.
5364
5365     """
5366     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5367       for attr in ["name", "mem_size", "disks", "disk_template",
5368                    "os", "tags", "nics", "vcpus"]:
5369         if not hasattr(self.op, attr):
5370           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5371                                      attr)
5372       iname = self.cfg.ExpandInstanceName(self.op.name)
5373       if iname is not None:
5374         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5375                                    iname)
5376       if not isinstance(self.op.nics, list):
5377         raise errors.OpPrereqError("Invalid parameter 'nics'")
5378       for row in self.op.nics:
5379         if (not isinstance(row, dict) or
5380             "mac" not in row or
5381             "ip" not in row or
5382             "bridge" not in row):
5383           raise errors.OpPrereqError("Invalid contents of the"
5384                                      " 'nics' parameter")
5385       if not isinstance(self.op.disks, list):
5386         raise errors.OpPrereqError("Invalid parameter 'disks'")
5387       if len(self.op.disks) != 2:
5388         raise errors.OpPrereqError("Only two-disk configurations supported")
5389       for row in self.op.disks:
5390         if (not isinstance(row, dict) or
5391             "size" not in row or
5392             not isinstance(row["size"], int) or
5393             "mode" not in row or
5394             row["mode"] not in ['r', 'w']):
5395           raise errors.OpPrereqError("Invalid contents of the"
5396                                      " 'disks' parameter")
5397     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5398       if not hasattr(self.op, "name"):
5399         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5400       fname = self.cfg.ExpandInstanceName(self.op.name)
5401       if fname is None:
5402         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5403                                    self.op.name)
5404       self.op.name = fname
5405       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5406     else:
5407       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5408                                  self.op.mode)
5409
5410     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5411       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5412         raise errors.OpPrereqError("Missing allocator name")
5413     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5414       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5415                                  self.op.direction)
5416
5417   def Exec(self, feedback_fn):
5418     """Run the allocator test.
5419
5420     """
5421     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5422       ial = IAllocator(self.cfg, self.sstore,
5423                        mode=self.op.mode,
5424                        name=self.op.name,
5425                        mem_size=self.op.mem_size,
5426                        disks=self.op.disks,
5427                        disk_template=self.op.disk_template,
5428                        os=self.op.os,
5429                        tags=self.op.tags,
5430                        nics=self.op.nics,
5431                        vcpus=self.op.vcpus,
5432                        )
5433     else:
5434       ial = IAllocator(self.cfg, self.sstore,
5435                        mode=self.op.mode,
5436                        name=self.op.name,
5437                        relocate_from=list(self.relocate_from),
5438                        )
5439
5440     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5441       result = ial.in_text
5442     else:
5443       ial.Run(self.op.allocator, validate=False)
5444       result = ial.out_text
5445     return result