Add tags to “gnt-cluster verify” hook environment
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     tags = self.cfg.GetClusterInfo().GetTags()
862     # TODO: populate the environment with useful information for verify hooks
863     env = {
864       "CLUSTER_TAGS": " ".join(tags),
865       }
866     return env, [], all_nodes
867
868   def Exec(self, feedback_fn):
869     """Verify integrity of cluster, performing various test on nodes.
870
871     """
872     bad = False
873     feedback_fn("* Verifying global settings")
874     for msg in self.cfg.VerifyConfig():
875       feedback_fn("  - ERROR: %s" % msg)
876
877     vg_name = self.cfg.GetVGName()
878     nodelist = utils.NiceSort(self.cfg.GetNodeList())
879     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881     i_non_redundant = [] # Non redundant instances
882     node_volume = {}
883     node_instance = {}
884     node_info = {}
885     instance_cfg = {}
886
887     # FIXME: verify OS list
888     # do local checksums
889     file_names = list(self.sstore.GetFileList())
890     file_names.append(constants.SSL_CERT_FILE)
891     file_names.append(constants.CLUSTER_CONF_FILE)
892     local_checksums = utils.FingerprintFiles(file_names)
893
894     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896     all_instanceinfo = rpc.call_instance_list(nodelist)
897     all_vglist = rpc.call_vg_list(nodelist)
898     node_verify_param = {
899       'filelist': file_names,
900       'nodelist': nodelist,
901       'hypervisor': None,
902       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903                         for node in nodeinfo]
904       }
905     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906     all_rversion = rpc.call_version(nodelist)
907     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
908
909     for node in nodelist:
910       feedback_fn("* Verifying node %s" % node)
911       result = self._VerifyNode(node, file_names, local_checksums,
912                                 all_vglist[node], all_nvinfo[node],
913                                 all_rversion[node], feedback_fn)
914       bad = bad or result
915
916       # node_volume
917       volumeinfo = all_volumeinfo[node]
918
919       if isinstance(volumeinfo, basestring):
920         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
921                     (node, volumeinfo[-400:].encode('string_escape')))
922         bad = True
923         node_volume[node] = {}
924       elif not isinstance(volumeinfo, dict):
925         feedback_fn("  - ERROR: connection to %s failed" % (node,))
926         bad = True
927         continue
928       else:
929         node_volume[node] = volumeinfo
930
931       # node_instance
932       nodeinstance = all_instanceinfo[node]
933       if type(nodeinstance) != list:
934         feedback_fn("  - ERROR: connection to %s failed" % (node,))
935         bad = True
936         continue
937
938       node_instance[node] = nodeinstance
939
940       # node_info
941       nodeinfo = all_ninfo[node]
942       if not isinstance(nodeinfo, dict):
943         feedback_fn("  - ERROR: connection to %s failed" % (node,))
944         bad = True
945         continue
946
947       try:
948         node_info[node] = {
949           "mfree": int(nodeinfo['memory_free']),
950           "dfree": int(nodeinfo['vg_free']),
951           "pinst": [],
952           "sinst": [],
953           # dictionary holding all instances this node is secondary for,
954           # grouped by their primary node. Each key is a cluster node, and each
955           # value is a list of instances which have the key as primary and the
956           # current node as secondary.  this is handy to calculate N+1 memory
957           # availability if you can only failover from a primary to its
958           # secondary.
959           "sinst-by-pnode": {},
960         }
961       except ValueError:
962         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
963         bad = True
964         continue
965
966     node_vol_should = {}
967
968     for instance in instancelist:
969       feedback_fn("* Verifying instance %s" % instance)
970       inst_config = self.cfg.GetInstanceInfo(instance)
971       result =  self._VerifyInstance(instance, inst_config, node_volume,
972                                      node_instance, feedback_fn)
973       bad = bad or result
974
975       inst_config.MapLVsByNode(node_vol_should)
976
977       instance_cfg[instance] = inst_config
978
979       pnode = inst_config.primary_node
980       if pnode in node_info:
981         node_info[pnode]['pinst'].append(instance)
982       else:
983         feedback_fn("  - ERROR: instance %s, connection to primary node"
984                     " %s failed" % (instance, pnode))
985         bad = True
986
987       # If the instance is non-redundant we cannot survive losing its primary
988       # node, so we are not N+1 compliant. On the other hand we have no disk
989       # templates with more than one secondary so that situation is not well
990       # supported either.
991       # FIXME: does not support file-backed instances
992       if len(inst_config.secondary_nodes) == 0:
993         i_non_redundant.append(instance)
994       elif len(inst_config.secondary_nodes) > 1:
995         feedback_fn("  - WARNING: multiple secondaries for instance %s"
996                     % instance)
997
998       for snode in inst_config.secondary_nodes:
999         if snode in node_info:
1000           node_info[snode]['sinst'].append(instance)
1001           if pnode not in node_info[snode]['sinst-by-pnode']:
1002             node_info[snode]['sinst-by-pnode'][pnode] = []
1003           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1004         else:
1005           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1006                       " %s failed" % (instance, snode))
1007
1008     feedback_fn("* Verifying orphan volumes")
1009     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1010                                        feedback_fn)
1011     bad = bad or result
1012
1013     feedback_fn("* Verifying remaining instances")
1014     result = self._VerifyOrphanInstances(instancelist, node_instance,
1015                                          feedback_fn)
1016     bad = bad or result
1017
1018     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1019       feedback_fn("* Verifying N+1 Memory redundancy")
1020       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1021       bad = bad or result
1022
1023     feedback_fn("* Other Notes")
1024     if i_non_redundant:
1025       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1026                   % len(i_non_redundant))
1027
1028     return int(bad)
1029
1030   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1031     """Analize the post-hooks' result, handle it, and send some
1032     nicely-formatted feedback back to the user.
1033
1034     Args:
1035       phase: the hooks phase that has just been run
1036       hooks_results: the results of the multi-node hooks rpc call
1037       feedback_fn: function to send feedback back to the caller
1038       lu_result: previous Exec result
1039
1040     """
1041     # We only really run POST phase hooks, and are only interested in their results
1042     if phase == constants.HOOKS_PHASE_POST:
1043       # Used to change hooks' output to proper indentation
1044       indent_re = re.compile('^', re.M)
1045       feedback_fn("* Hooks Results")
1046       if not hooks_results:
1047         feedback_fn("  - ERROR: general communication failure")
1048         lu_result = 1
1049       else:
1050         for node_name in hooks_results:
1051           show_node_header = True
1052           res = hooks_results[node_name]
1053           if res is False or not isinstance(res, list):
1054             feedback_fn("    Communication failure")
1055             lu_result = 1
1056             continue
1057           for script, hkr, output in res:
1058             if hkr == constants.HKR_FAIL:
1059               # The node header is only shown once, if there are
1060               # failing hooks on that node
1061               if show_node_header:
1062                 feedback_fn("  Node %s:" % node_name)
1063                 show_node_header = False
1064               feedback_fn("    ERROR: Script %s failed, output:" % script)
1065               output = indent_re.sub('      ', output)
1066               feedback_fn("%s" % output)
1067               lu_result = 1
1068
1069       return lu_result
1070
1071
1072 class LUVerifyDisks(NoHooksLU):
1073   """Verifies the cluster disks status.
1074
1075   """
1076   _OP_REQP = []
1077
1078   def CheckPrereq(self):
1079     """Check prerequisites.
1080
1081     This has no prerequisites.
1082
1083     """
1084     pass
1085
1086   def Exec(self, feedback_fn):
1087     """Verify integrity of cluster disks.
1088
1089     """
1090     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1091
1092     vg_name = self.cfg.GetVGName()
1093     nodes = utils.NiceSort(self.cfg.GetNodeList())
1094     instances = [self.cfg.GetInstanceInfo(name)
1095                  for name in self.cfg.GetInstanceList()]
1096
1097     nv_dict = {}
1098     for inst in instances:
1099       inst_lvs = {}
1100       if (inst.status != "up" or
1101           inst.disk_template not in constants.DTS_NET_MIRROR):
1102         continue
1103       inst.MapLVsByNode(inst_lvs)
1104       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1105       for node, vol_list in inst_lvs.iteritems():
1106         for vol in vol_list:
1107           nv_dict[(node, vol)] = inst
1108
1109     if not nv_dict:
1110       return result
1111
1112     node_lvs = rpc.call_volume_list(nodes, vg_name)
1113
1114     to_act = set()
1115     for node in nodes:
1116       # node_volume
1117       lvs = node_lvs[node]
1118
1119       if isinstance(lvs, basestring):
1120         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1121         res_nlvm[node] = lvs
1122       elif not isinstance(lvs, dict):
1123         logger.Info("connection to node %s failed or invalid data returned" %
1124                     (node,))
1125         res_nodes.append(node)
1126         continue
1127
1128       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1129         inst = nv_dict.pop((node, lv_name), None)
1130         if (not lv_online and inst is not None
1131             and inst.name not in res_instances):
1132           res_instances.append(inst.name)
1133
1134     # any leftover items in nv_dict are missing LVs, let's arrange the
1135     # data better
1136     for key, inst in nv_dict.iteritems():
1137       if inst.name not in res_missing:
1138         res_missing[inst.name] = []
1139       res_missing[inst.name].append(key)
1140
1141     return result
1142
1143
1144 class LURenameCluster(LogicalUnit):
1145   """Rename the cluster.
1146
1147   """
1148   HPATH = "cluster-rename"
1149   HTYPE = constants.HTYPE_CLUSTER
1150   _OP_REQP = ["name"]
1151
1152   def BuildHooksEnv(self):
1153     """Build hooks env.
1154
1155     """
1156     env = {
1157       "OP_TARGET": self.sstore.GetClusterName(),
1158       "NEW_NAME": self.op.name,
1159       }
1160     mn = self.sstore.GetMasterNode()
1161     return env, [mn], [mn]
1162
1163   def CheckPrereq(self):
1164     """Verify that the passed name is a valid one.
1165
1166     """
1167     hostname = utils.HostInfo(self.op.name)
1168
1169     new_name = hostname.name
1170     self.ip = new_ip = hostname.ip
1171     old_name = self.sstore.GetClusterName()
1172     old_ip = self.sstore.GetMasterIP()
1173     if new_name == old_name and new_ip == old_ip:
1174       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1175                                  " cluster has changed")
1176     if new_ip != old_ip:
1177       result = utils.RunCmd(["fping", "-q", new_ip])
1178       if not result.failed:
1179         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1180                                    " reachable on the network. Aborting." %
1181                                    new_ip)
1182
1183     self.op.name = new_name
1184
1185   def Exec(self, feedback_fn):
1186     """Rename the cluster.
1187
1188     """
1189     clustername = self.op.name
1190     ip = self.ip
1191     ss = self.sstore
1192
1193     # shutdown the master IP
1194     master = ss.GetMasterNode()
1195     if not rpc.call_node_stop_master(master):
1196       raise errors.OpExecError("Could not disable the master role")
1197
1198     try:
1199       # modify the sstore
1200       ss.SetKey(ss.SS_MASTER_IP, ip)
1201       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1202
1203       # Distribute updated ss config to all nodes
1204       myself = self.cfg.GetNodeInfo(master)
1205       dist_nodes = self.cfg.GetNodeList()
1206       if myself.name in dist_nodes:
1207         dist_nodes.remove(myself.name)
1208
1209       logger.Debug("Copying updated ssconf data to all nodes")
1210       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1211         fname = ss.KeyToFilename(keyname)
1212         result = rpc.call_upload_file(dist_nodes, fname)
1213         for to_node in dist_nodes:
1214           if not result[to_node]:
1215             logger.Error("copy of file %s to node %s failed" %
1216                          (fname, to_node))
1217     finally:
1218       if not rpc.call_node_start_master(master):
1219         logger.Error("Could not re-enable the master role on the master,"
1220                      " please restart manually.")
1221
1222
1223 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1224   """Sleep and poll for an instance's disk to sync.
1225
1226   """
1227   if not instance.disks:
1228     return True
1229
1230   if not oneshot:
1231     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1232
1233   node = instance.primary_node
1234
1235   for dev in instance.disks:
1236     cfgw.SetDiskID(dev, node)
1237
1238   retries = 0
1239   while True:
1240     max_time = 0
1241     done = True
1242     cumul_degraded = False
1243     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1244     if not rstats:
1245       proc.LogWarning("Can't get any data from node %s" % node)
1246       retries += 1
1247       if retries >= 10:
1248         raise errors.RemoteError("Can't contact node %s for mirror data,"
1249                                  " aborting." % node)
1250       time.sleep(6)
1251       continue
1252     retries = 0
1253     for i in range(len(rstats)):
1254       mstat = rstats[i]
1255       if mstat is None:
1256         proc.LogWarning("Can't compute data for node %s/%s" %
1257                         (node, instance.disks[i].iv_name))
1258         continue
1259       # we ignore the ldisk parameter
1260       perc_done, est_time, is_degraded, _ = mstat
1261       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1262       if perc_done is not None:
1263         done = False
1264         if est_time is not None:
1265           rem_time = "%d estimated seconds remaining" % est_time
1266           max_time = est_time
1267         else:
1268           rem_time = "no time estimate"
1269         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1270                      (instance.disks[i].iv_name, perc_done, rem_time))
1271     if done or oneshot:
1272       break
1273
1274     if unlock:
1275       utils.Unlock('cmd')
1276     try:
1277       time.sleep(min(60, max_time))
1278     finally:
1279       if unlock:
1280         utils.Lock('cmd')
1281
1282   if done:
1283     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284   return not cumul_degraded
1285
1286
1287 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1288   """Check that mirrors are not degraded.
1289
1290   The ldisk parameter, if True, will change the test from the
1291   is_degraded attribute (which represents overall non-ok status for
1292   the device(s)) to the ldisk (representing the local storage status).
1293
1294   """
1295   cfgw.SetDiskID(dev, node)
1296   if ldisk:
1297     idx = 6
1298   else:
1299     idx = 5
1300
1301   result = True
1302   if on_primary or dev.AssembleOnSecondary():
1303     rstats = rpc.call_blockdev_find(node, dev)
1304     if not rstats:
1305       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1306       result = False
1307     else:
1308       result = result and (not rstats[idx])
1309   if dev.children:
1310     for child in dev.children:
1311       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1312
1313   return result
1314
1315
1316 class LUDiagnoseOS(NoHooksLU):
1317   """Logical unit for OS diagnose/query.
1318
1319   """
1320   _OP_REQP = ["output_fields", "names"]
1321
1322   def CheckPrereq(self):
1323     """Check prerequisites.
1324
1325     This always succeeds, since this is a pure query LU.
1326
1327     """
1328     if self.op.names:
1329       raise errors.OpPrereqError("Selective OS query not supported")
1330
1331     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1332     _CheckOutputFields(static=[],
1333                        dynamic=self.dynamic_fields,
1334                        selected=self.op.output_fields)
1335
1336   @staticmethod
1337   def _DiagnoseByOS(node_list, rlist):
1338     """Remaps a per-node return list into an a per-os per-node dictionary
1339
1340       Args:
1341         node_list: a list with the names of all nodes
1342         rlist: a map with node names as keys and OS objects as values
1343
1344       Returns:
1345         map: a map with osnames as keys and as value another map, with
1346              nodes as
1347              keys and list of OS objects as values
1348              e.g. {"debian-etch": {"node1": [<object>,...],
1349                                    "node2": [<object>,]}
1350                   }
1351
1352     """
1353     all_os = {}
1354     for node_name, nr in rlist.iteritems():
1355       if not nr:
1356         continue
1357       for os in nr:
1358         if os.name not in all_os:
1359           # build a list of nodes for this os containing empty lists
1360           # for each node in node_list
1361           all_os[os.name] = {}
1362           for nname in node_list:
1363             all_os[os.name][nname] = []
1364         all_os[os.name][node_name].append(os)
1365     return all_os
1366
1367   def Exec(self, feedback_fn):
1368     """Compute the list of OSes.
1369
1370     """
1371     node_list = self.cfg.GetNodeList()
1372     node_data = rpc.call_os_diagnose(node_list)
1373     if node_data == False:
1374       raise errors.OpExecError("Can't gather the list of OSes")
1375     pol = self._DiagnoseByOS(node_list, node_data)
1376     output = []
1377     for os_name, os_data in pol.iteritems():
1378       row = []
1379       for field in self.op.output_fields:
1380         if field == "name":
1381           val = os_name
1382         elif field == "valid":
1383           val = utils.all([osl and osl[0] for osl in os_data.values()])
1384         elif field == "node_status":
1385           val = {}
1386           for node_name, nos_list in os_data.iteritems():
1387             val[node_name] = [(v.status, v.path) for v in nos_list]
1388         else:
1389           raise errors.ParameterError(field)
1390         row.append(val)
1391       output.append(row)
1392
1393     return output
1394
1395
1396 class LURemoveNode(LogicalUnit):
1397   """Logical unit for removing a node.
1398
1399   """
1400   HPATH = "node-remove"
1401   HTYPE = constants.HTYPE_NODE
1402   _OP_REQP = ["node_name"]
1403
1404   def BuildHooksEnv(self):
1405     """Build hooks env.
1406
1407     This doesn't run on the target node in the pre phase as a failed
1408     node would not allows itself to run.
1409
1410     """
1411     env = {
1412       "OP_TARGET": self.op.node_name,
1413       "NODE_NAME": self.op.node_name,
1414       }
1415     all_nodes = self.cfg.GetNodeList()
1416     all_nodes.remove(self.op.node_name)
1417     return env, all_nodes, all_nodes
1418
1419   def CheckPrereq(self):
1420     """Check prerequisites.
1421
1422     This checks:
1423      - the node exists in the configuration
1424      - it does not have primary or secondary instances
1425      - it's not the master
1426
1427     Any errors are signalled by raising errors.OpPrereqError.
1428
1429     """
1430     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1431     if node is None:
1432       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1433
1434     instance_list = self.cfg.GetInstanceList()
1435
1436     masternode = self.sstore.GetMasterNode()
1437     if node.name == masternode:
1438       raise errors.OpPrereqError("Node is the master node,"
1439                                  " you need to failover first.")
1440
1441     for instance_name in instance_list:
1442       instance = self.cfg.GetInstanceInfo(instance_name)
1443       if node.name == instance.primary_node:
1444         raise errors.OpPrereqError("Instance %s still running on the node,"
1445                                    " please remove first." % instance_name)
1446       if node.name in instance.secondary_nodes:
1447         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1448                                    " please remove first." % instance_name)
1449     self.op.node_name = node.name
1450     self.node = node
1451
1452   def Exec(self, feedback_fn):
1453     """Removes the node from the cluster.
1454
1455     """
1456     node = self.node
1457     logger.Info("stopping the node daemon and removing configs from node %s" %
1458                 node.name)
1459
1460     rpc.call_node_leave_cluster(node.name)
1461
1462     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1463
1464     logger.Info("Removing node %s from config" % node.name)
1465
1466     self.cfg.RemoveNode(node.name)
1467
1468     _RemoveHostFromEtcHosts(node.name)
1469
1470
1471 class LUQueryNodes(NoHooksLU):
1472   """Logical unit for querying nodes.
1473
1474   """
1475   _OP_REQP = ["output_fields", "names"]
1476
1477   def CheckPrereq(self):
1478     """Check prerequisites.
1479
1480     This checks that the fields required are valid output fields.
1481
1482     """
1483     self.dynamic_fields = frozenset([
1484       "dtotal", "dfree",
1485       "mtotal", "mnode", "mfree",
1486       "bootid",
1487       "ctotal",
1488       ])
1489
1490     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1491                                "pinst_list", "sinst_list",
1492                                "pip", "sip"],
1493                        dynamic=self.dynamic_fields,
1494                        selected=self.op.output_fields)
1495
1496     self.wanted = _GetWantedNodes(self, self.op.names)
1497
1498   def Exec(self, feedback_fn):
1499     """Computes the list of nodes and their attributes.
1500
1501     """
1502     nodenames = self.wanted
1503     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1504
1505     # begin data gathering
1506
1507     if self.dynamic_fields.intersection(self.op.output_fields):
1508       live_data = {}
1509       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1510       for name in nodenames:
1511         nodeinfo = node_data.get(name, None)
1512         if nodeinfo:
1513           live_data[name] = {
1514             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1515             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1516             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1517             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1518             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1519             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1520             "bootid": nodeinfo['bootid'],
1521             }
1522         else:
1523           live_data[name] = {}
1524     else:
1525       live_data = dict.fromkeys(nodenames, {})
1526
1527     node_to_primary = dict([(name, set()) for name in nodenames])
1528     node_to_secondary = dict([(name, set()) for name in nodenames])
1529
1530     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1531                              "sinst_cnt", "sinst_list"))
1532     if inst_fields & frozenset(self.op.output_fields):
1533       instancelist = self.cfg.GetInstanceList()
1534
1535       for instance_name in instancelist:
1536         inst = self.cfg.GetInstanceInfo(instance_name)
1537         if inst.primary_node in node_to_primary:
1538           node_to_primary[inst.primary_node].add(inst.name)
1539         for secnode in inst.secondary_nodes:
1540           if secnode in node_to_secondary:
1541             node_to_secondary[secnode].add(inst.name)
1542
1543     # end data gathering
1544
1545     output = []
1546     for node in nodelist:
1547       node_output = []
1548       for field in self.op.output_fields:
1549         if field == "name":
1550           val = node.name
1551         elif field == "pinst_list":
1552           val = list(node_to_primary[node.name])
1553         elif field == "sinst_list":
1554           val = list(node_to_secondary[node.name])
1555         elif field == "pinst_cnt":
1556           val = len(node_to_primary[node.name])
1557         elif field == "sinst_cnt":
1558           val = len(node_to_secondary[node.name])
1559         elif field == "pip":
1560           val = node.primary_ip
1561         elif field == "sip":
1562           val = node.secondary_ip
1563         elif field in self.dynamic_fields:
1564           val = live_data[node.name].get(field, None)
1565         else:
1566           raise errors.ParameterError(field)
1567         node_output.append(val)
1568       output.append(node_output)
1569
1570     return output
1571
1572
1573 class LUQueryNodeVolumes(NoHooksLU):
1574   """Logical unit for getting volumes on node(s).
1575
1576   """
1577   _OP_REQP = ["nodes", "output_fields"]
1578
1579   def CheckPrereq(self):
1580     """Check prerequisites.
1581
1582     This checks that the fields required are valid output fields.
1583
1584     """
1585     self.nodes = _GetWantedNodes(self, self.op.nodes)
1586
1587     _CheckOutputFields(static=["node"],
1588                        dynamic=["phys", "vg", "name", "size", "instance"],
1589                        selected=self.op.output_fields)
1590
1591
1592   def Exec(self, feedback_fn):
1593     """Computes the list of nodes and their attributes.
1594
1595     """
1596     nodenames = self.nodes
1597     volumes = rpc.call_node_volumes(nodenames)
1598
1599     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1600              in self.cfg.GetInstanceList()]
1601
1602     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1603
1604     output = []
1605     for node in nodenames:
1606       if node not in volumes or not volumes[node]:
1607         continue
1608
1609       node_vols = volumes[node][:]
1610       node_vols.sort(key=lambda vol: vol['dev'])
1611
1612       for vol in node_vols:
1613         node_output = []
1614         for field in self.op.output_fields:
1615           if field == "node":
1616             val = node
1617           elif field == "phys":
1618             val = vol['dev']
1619           elif field == "vg":
1620             val = vol['vg']
1621           elif field == "name":
1622             val = vol['name']
1623           elif field == "size":
1624             val = int(float(vol['size']))
1625           elif field == "instance":
1626             for inst in ilist:
1627               if node not in lv_by_node[inst]:
1628                 continue
1629               if vol['name'] in lv_by_node[inst][node]:
1630                 val = inst.name
1631                 break
1632             else:
1633               val = '-'
1634           else:
1635             raise errors.ParameterError(field)
1636           node_output.append(str(val))
1637
1638         output.append(node_output)
1639
1640     return output
1641
1642
1643 class LUAddNode(LogicalUnit):
1644   """Logical unit for adding node to the cluster.
1645
1646   """
1647   HPATH = "node-add"
1648   HTYPE = constants.HTYPE_NODE
1649   _OP_REQP = ["node_name"]
1650
1651   def BuildHooksEnv(self):
1652     """Build hooks env.
1653
1654     This will run on all nodes before, and on all nodes + the new node after.
1655
1656     """
1657     env = {
1658       "OP_TARGET": self.op.node_name,
1659       "NODE_NAME": self.op.node_name,
1660       "NODE_PIP": self.op.primary_ip,
1661       "NODE_SIP": self.op.secondary_ip,
1662       }
1663     nodes_0 = self.cfg.GetNodeList()
1664     nodes_1 = nodes_0 + [self.op.node_name, ]
1665     return env, nodes_0, nodes_1
1666
1667   def CheckPrereq(self):
1668     """Check prerequisites.
1669
1670     This checks:
1671      - the new node is not already in the config
1672      - it is resolvable
1673      - its parameters (single/dual homed) matches the cluster
1674
1675     Any errors are signalled by raising errors.OpPrereqError.
1676
1677     """
1678     node_name = self.op.node_name
1679     cfg = self.cfg
1680
1681     dns_data = utils.HostInfo(node_name)
1682
1683     node = dns_data.name
1684     primary_ip = self.op.primary_ip = dns_data.ip
1685     secondary_ip = getattr(self.op, "secondary_ip", None)
1686     if secondary_ip is None:
1687       secondary_ip = primary_ip
1688     if not utils.IsValidIP(secondary_ip):
1689       raise errors.OpPrereqError("Invalid secondary IP given")
1690     self.op.secondary_ip = secondary_ip
1691
1692     node_list = cfg.GetNodeList()
1693     if not self.op.readd and node in node_list:
1694       raise errors.OpPrereqError("Node %s is already in the configuration" %
1695                                  node)
1696     elif self.op.readd and node not in node_list:
1697       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1698
1699     for existing_node_name in node_list:
1700       existing_node = cfg.GetNodeInfo(existing_node_name)
1701
1702       if self.op.readd and node == existing_node_name:
1703         if (existing_node.primary_ip != primary_ip or
1704             existing_node.secondary_ip != secondary_ip):
1705           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1706                                      " address configuration as before")
1707         continue
1708
1709       if (existing_node.primary_ip == primary_ip or
1710           existing_node.secondary_ip == primary_ip or
1711           existing_node.primary_ip == secondary_ip or
1712           existing_node.secondary_ip == secondary_ip):
1713         raise errors.OpPrereqError("New node ip address(es) conflict with"
1714                                    " existing node %s" % existing_node.name)
1715
1716     # check that the type of the node (single versus dual homed) is the
1717     # same as for the master
1718     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1719     master_singlehomed = myself.secondary_ip == myself.primary_ip
1720     newbie_singlehomed = secondary_ip == primary_ip
1721     if master_singlehomed != newbie_singlehomed:
1722       if master_singlehomed:
1723         raise errors.OpPrereqError("The master has no private ip but the"
1724                                    " new node has one")
1725       else:
1726         raise errors.OpPrereqError("The master has a private ip but the"
1727                                    " new node doesn't have one")
1728
1729     # checks reachablity
1730     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1731       raise errors.OpPrereqError("Node not reachable by ping")
1732
1733     if not newbie_singlehomed:
1734       # check reachability from my secondary ip to newbie's secondary ip
1735       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1736                            source=myself.secondary_ip):
1737         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1738                                    " based ping to noded port")
1739
1740     self.new_node = objects.Node(name=node,
1741                                  primary_ip=primary_ip,
1742                                  secondary_ip=secondary_ip)
1743
1744     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1745       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1746         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1747                                    constants.VNC_PASSWORD_FILE)
1748
1749   def Exec(self, feedback_fn):
1750     """Adds the new node to the cluster.
1751
1752     """
1753     new_node = self.new_node
1754     node = new_node.name
1755
1756     # set up inter-node password and certificate and restarts the node daemon
1757     gntpass = self.sstore.GetNodeDaemonPassword()
1758     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1759       raise errors.OpExecError("ganeti password corruption detected")
1760     f = open(constants.SSL_CERT_FILE)
1761     try:
1762       gntpem = f.read(8192)
1763     finally:
1764       f.close()
1765     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1766     # so we use this to detect an invalid certificate; as long as the
1767     # cert doesn't contain this, the here-document will be correctly
1768     # parsed by the shell sequence below
1769     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1770       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1771     if not gntpem.endswith("\n"):
1772       raise errors.OpExecError("PEM must end with newline")
1773     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1774
1775     # and then connect with ssh to set password and start ganeti-noded
1776     # note that all the below variables are sanitized at this point,
1777     # either by being constants or by the checks above
1778     ss = self.sstore
1779     mycommand = ("umask 077 && "
1780                  "echo '%s' > '%s' && "
1781                  "cat > '%s' << '!EOF.' && \n"
1782                  "%s!EOF.\n%s restart" %
1783                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1784                   constants.SSL_CERT_FILE, gntpem,
1785                   constants.NODE_INITD_SCRIPT))
1786
1787     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1788     if result.failed:
1789       raise errors.OpExecError("Remote command on node %s, error: %s,"
1790                                " output: %s" %
1791                                (node, result.fail_reason, result.output))
1792
1793     # check connectivity
1794     time.sleep(4)
1795
1796     result = rpc.call_version([node])[node]
1797     if result:
1798       if constants.PROTOCOL_VERSION == result:
1799         logger.Info("communication to node %s fine, sw version %s match" %
1800                     (node, result))
1801       else:
1802         raise errors.OpExecError("Version mismatch master version %s,"
1803                                  " node version %s" %
1804                                  (constants.PROTOCOL_VERSION, result))
1805     else:
1806       raise errors.OpExecError("Cannot get version from the new node")
1807
1808     # setup ssh on node
1809     logger.Info("copy ssh key to node %s" % node)
1810     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1811     keyarray = []
1812     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1813                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1814                 priv_key, pub_key]
1815
1816     for i in keyfiles:
1817       f = open(i, 'r')
1818       try:
1819         keyarray.append(f.read())
1820       finally:
1821         f.close()
1822
1823     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1824                                keyarray[3], keyarray[4], keyarray[5])
1825
1826     if not result:
1827       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1828
1829     # Add node to our /etc/hosts, and add key to known_hosts
1830     _AddHostToEtcHosts(new_node.name)
1831
1832     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1833                       self.cfg.GetHostKey())
1834
1835     if new_node.secondary_ip != new_node.primary_ip:
1836       if not rpc.call_node_tcp_ping(new_node.name,
1837                                     constants.LOCALHOST_IP_ADDRESS,
1838                                     new_node.secondary_ip,
1839                                     constants.DEFAULT_NODED_PORT,
1840                                     10, False):
1841         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1842                                  " you gave (%s). Please fix and re-run this"
1843                                  " command." % new_node.secondary_ip)
1844
1845     success, msg = ssh.VerifyNodeHostname(node)
1846     if not success:
1847       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1848                                " than the one the resolver gives: %s."
1849                                " Please fix and re-run this command." %
1850                                (node, msg))
1851
1852     # Distribute updated /etc/hosts and known_hosts to all nodes,
1853     # including the node just added
1854     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1855     dist_nodes = self.cfg.GetNodeList()
1856     if not self.op.readd:
1857       dist_nodes.append(node)
1858     if myself.name in dist_nodes:
1859       dist_nodes.remove(myself.name)
1860
1861     logger.Debug("Copying hosts and known_hosts to all nodes")
1862     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1863       result = rpc.call_upload_file(dist_nodes, fname)
1864       for to_node in dist_nodes:
1865         if not result[to_node]:
1866           logger.Error("copy of file %s to node %s failed" %
1867                        (fname, to_node))
1868
1869     to_copy = ss.GetFileList()
1870     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1871       to_copy.append(constants.VNC_PASSWORD_FILE)
1872     for fname in to_copy:
1873       if not ssh.CopyFileToNode(node, fname):
1874         logger.Error("could not copy file %s to node %s" % (fname, node))
1875
1876     if not self.op.readd:
1877       logger.Info("adding node %s to cluster.conf" % node)
1878       self.cfg.AddNode(new_node)
1879
1880
1881 class LUMasterFailover(LogicalUnit):
1882   """Failover the master node to the current node.
1883
1884   This is a special LU in that it must run on a non-master node.
1885
1886   """
1887   HPATH = "master-failover"
1888   HTYPE = constants.HTYPE_CLUSTER
1889   REQ_MASTER = False
1890   _OP_REQP = []
1891
1892   def BuildHooksEnv(self):
1893     """Build hooks env.
1894
1895     This will run on the new master only in the pre phase, and on all
1896     the nodes in the post phase.
1897
1898     """
1899     env = {
1900       "OP_TARGET": self.new_master,
1901       "NEW_MASTER": self.new_master,
1902       "OLD_MASTER": self.old_master,
1903       }
1904     return env, [self.new_master], self.cfg.GetNodeList()
1905
1906   def CheckPrereq(self):
1907     """Check prerequisites.
1908
1909     This checks that we are not already the master.
1910
1911     """
1912     self.new_master = utils.HostInfo().name
1913     self.old_master = self.sstore.GetMasterNode()
1914
1915     if self.old_master == self.new_master:
1916       raise errors.OpPrereqError("This commands must be run on the node"
1917                                  " where you want the new master to be."
1918                                  " %s is already the master" %
1919                                  self.old_master)
1920
1921   def Exec(self, feedback_fn):
1922     """Failover the master node.
1923
1924     This command, when run on a non-master node, will cause the current
1925     master to cease being master, and the non-master to become new
1926     master.
1927
1928     """
1929     #TODO: do not rely on gethostname returning the FQDN
1930     logger.Info("setting master to %s, old master: %s" %
1931                 (self.new_master, self.old_master))
1932
1933     if not rpc.call_node_stop_master(self.old_master):
1934       logger.Error("could disable the master role on the old master"
1935                    " %s, please disable manually" % self.old_master)
1936
1937     ss = self.sstore
1938     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1939     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1940                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1941       logger.Error("could not distribute the new simple store master file"
1942                    " to the other nodes, please check.")
1943
1944     if not rpc.call_node_start_master(self.new_master):
1945       logger.Error("could not start the master role on the new master"
1946                    " %s, please check" % self.new_master)
1947       feedback_fn("Error in activating the master IP on the new master,"
1948                   " please fix manually.")
1949
1950
1951
1952 class LUQueryClusterInfo(NoHooksLU):
1953   """Query cluster configuration.
1954
1955   """
1956   _OP_REQP = []
1957   REQ_MASTER = False
1958
1959   def CheckPrereq(self):
1960     """No prerequsites needed for this LU.
1961
1962     """
1963     pass
1964
1965   def Exec(self, feedback_fn):
1966     """Return cluster config.
1967
1968     """
1969     result = {
1970       "name": self.sstore.GetClusterName(),
1971       "software_version": constants.RELEASE_VERSION,
1972       "protocol_version": constants.PROTOCOL_VERSION,
1973       "config_version": constants.CONFIG_VERSION,
1974       "os_api_version": constants.OS_API_VERSION,
1975       "export_version": constants.EXPORT_VERSION,
1976       "master": self.sstore.GetMasterNode(),
1977       "architecture": (platform.architecture()[0], platform.machine()),
1978       "hypervisor_type": self.sstore.GetHypervisorType(),
1979       }
1980
1981     return result
1982
1983
1984 class LUClusterCopyFile(NoHooksLU):
1985   """Copy file to cluster.
1986
1987   """
1988   _OP_REQP = ["nodes", "filename"]
1989
1990   def CheckPrereq(self):
1991     """Check prerequisites.
1992
1993     It should check that the named file exists and that the given list
1994     of nodes is valid.
1995
1996     """
1997     if not os.path.exists(self.op.filename):
1998       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1999
2000     self.nodes = _GetWantedNodes(self, self.op.nodes)
2001
2002   def Exec(self, feedback_fn):
2003     """Copy a file from master to some nodes.
2004
2005     Args:
2006       opts - class with options as members
2007       args - list containing a single element, the file name
2008     Opts used:
2009       nodes - list containing the name of target nodes; if empty, all nodes
2010
2011     """
2012     filename = self.op.filename
2013
2014     myname = utils.HostInfo().name
2015
2016     for node in self.nodes:
2017       if node == myname:
2018         continue
2019       if not ssh.CopyFileToNode(node, filename):
2020         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2021
2022
2023 class LUDumpClusterConfig(NoHooksLU):
2024   """Return a text-representation of the cluster-config.
2025
2026   """
2027   _OP_REQP = []
2028
2029   def CheckPrereq(self):
2030     """No prerequisites.
2031
2032     """
2033     pass
2034
2035   def Exec(self, feedback_fn):
2036     """Dump a representation of the cluster config to the standard output.
2037
2038     """
2039     return self.cfg.DumpConfig()
2040
2041
2042 class LURunClusterCommand(NoHooksLU):
2043   """Run a command on some nodes.
2044
2045   """
2046   _OP_REQP = ["command", "nodes"]
2047
2048   def CheckPrereq(self):
2049     """Check prerequisites.
2050
2051     It checks that the given list of nodes is valid.
2052
2053     """
2054     self.nodes = _GetWantedNodes(self, self.op.nodes)
2055
2056   def Exec(self, feedback_fn):
2057     """Run a command on some nodes.
2058
2059     """
2060     # put the master at the end of the nodes list
2061     master_node = self.sstore.GetMasterNode()
2062     if master_node in self.nodes:
2063       self.nodes.remove(master_node)
2064       self.nodes.append(master_node)
2065
2066     data = []
2067     for node in self.nodes:
2068       result = ssh.SSHCall(node, "root", self.op.command)
2069       data.append((node, result.output, result.exit_code))
2070
2071     return data
2072
2073
2074 class LUActivateInstanceDisks(NoHooksLU):
2075   """Bring up an instance's disks.
2076
2077   """
2078   _OP_REQP = ["instance_name"]
2079
2080   def CheckPrereq(self):
2081     """Check prerequisites.
2082
2083     This checks that the instance is in the cluster.
2084
2085     """
2086     instance = self.cfg.GetInstanceInfo(
2087       self.cfg.ExpandInstanceName(self.op.instance_name))
2088     if instance is None:
2089       raise errors.OpPrereqError("Instance '%s' not known" %
2090                                  self.op.instance_name)
2091     self.instance = instance
2092
2093
2094   def Exec(self, feedback_fn):
2095     """Activate the disks.
2096
2097     """
2098     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2099     if not disks_ok:
2100       raise errors.OpExecError("Cannot activate block devices")
2101
2102     return disks_info
2103
2104
2105 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2106   """Prepare the block devices for an instance.
2107
2108   This sets up the block devices on all nodes.
2109
2110   Args:
2111     instance: a ganeti.objects.Instance object
2112     ignore_secondaries: if true, errors on secondary nodes won't result
2113                         in an error return from the function
2114
2115   Returns:
2116     false if the operation failed
2117     list of (host, instance_visible_name, node_visible_name) if the operation
2118          suceeded with the mapping from node devices to instance devices
2119   """
2120   device_info = []
2121   disks_ok = True
2122   iname = instance.name
2123   # With the two passes mechanism we try to reduce the window of
2124   # opportunity for the race condition of switching DRBD to primary
2125   # before handshaking occured, but we do not eliminate it
2126
2127   # The proper fix would be to wait (with some limits) until the
2128   # connection has been made and drbd transitions from WFConnection
2129   # into any other network-connected state (Connected, SyncTarget,
2130   # SyncSource, etc.)
2131
2132   # 1st pass, assemble on all nodes in secondary mode
2133   for inst_disk in instance.disks:
2134     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2135       cfg.SetDiskID(node_disk, node)
2136       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2137       if not result:
2138         logger.Error("could not prepare block device %s on node %s"
2139                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2140         if not ignore_secondaries:
2141           disks_ok = False
2142
2143   # FIXME: race condition on drbd migration to primary
2144
2145   # 2nd pass, do only the primary node
2146   for inst_disk in instance.disks:
2147     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2148       if node != instance.primary_node:
2149         continue
2150       cfg.SetDiskID(node_disk, node)
2151       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2152       if not result:
2153         logger.Error("could not prepare block device %s on node %s"
2154                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2155         disks_ok = False
2156     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2157
2158   # leave the disks configured for the primary node
2159   # this is a workaround that would be fixed better by
2160   # improving the logical/physical id handling
2161   for disk in instance.disks:
2162     cfg.SetDiskID(disk, instance.primary_node)
2163
2164   return disks_ok, device_info
2165
2166
2167 def _StartInstanceDisks(cfg, instance, force):
2168   """Start the disks of an instance.
2169
2170   """
2171   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2172                                            ignore_secondaries=force)
2173   if not disks_ok:
2174     _ShutdownInstanceDisks(instance, cfg)
2175     if force is not None and not force:
2176       logger.Error("If the message above refers to a secondary node,"
2177                    " you can retry the operation using '--force'.")
2178     raise errors.OpExecError("Disk consistency error")
2179
2180
2181 class LUDeactivateInstanceDisks(NoHooksLU):
2182   """Shutdown an instance's disks.
2183
2184   """
2185   _OP_REQP = ["instance_name"]
2186
2187   def CheckPrereq(self):
2188     """Check prerequisites.
2189
2190     This checks that the instance is in the cluster.
2191
2192     """
2193     instance = self.cfg.GetInstanceInfo(
2194       self.cfg.ExpandInstanceName(self.op.instance_name))
2195     if instance is None:
2196       raise errors.OpPrereqError("Instance '%s' not known" %
2197                                  self.op.instance_name)
2198     self.instance = instance
2199
2200   def Exec(self, feedback_fn):
2201     """Deactivate the disks
2202
2203     """
2204     instance = self.instance
2205     ins_l = rpc.call_instance_list([instance.primary_node])
2206     ins_l = ins_l[instance.primary_node]
2207     if not type(ins_l) is list:
2208       raise errors.OpExecError("Can't contact node '%s'" %
2209                                instance.primary_node)
2210
2211     if self.instance.name in ins_l:
2212       raise errors.OpExecError("Instance is running, can't shutdown"
2213                                " block devices.")
2214
2215     _ShutdownInstanceDisks(instance, self.cfg)
2216
2217
2218 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2219   """Shutdown block devices of an instance.
2220
2221   This does the shutdown on all nodes of the instance.
2222
2223   If the ignore_primary is false, errors on the primary node are
2224   ignored.
2225
2226   """
2227   result = True
2228   for disk in instance.disks:
2229     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2230       cfg.SetDiskID(top_disk, node)
2231       if not rpc.call_blockdev_shutdown(node, top_disk):
2232         logger.Error("could not shutdown block device %s on node %s" %
2233                      (disk.iv_name, node))
2234         if not ignore_primary or node != instance.primary_node:
2235           result = False
2236   return result
2237
2238
2239 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2240   """Checks if a node has enough free memory.
2241
2242   This function check if a given node has the needed amount of free
2243   memory. In case the node has less memory or we cannot get the
2244   information from the node, this function raise an OpPrereqError
2245   exception.
2246
2247   Args:
2248     - cfg: a ConfigWriter instance
2249     - node: the node name
2250     - reason: string to use in the error message
2251     - requested: the amount of memory in MiB
2252
2253   """
2254   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2255   if not nodeinfo or not isinstance(nodeinfo, dict):
2256     raise errors.OpPrereqError("Could not contact node %s for resource"
2257                              " information" % (node,))
2258
2259   free_mem = nodeinfo[node].get('memory_free')
2260   if not isinstance(free_mem, int):
2261     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2262                              " was '%s'" % (node, free_mem))
2263   if requested > free_mem:
2264     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2265                              " needed %s MiB, available %s MiB" %
2266                              (node, reason, requested, free_mem))
2267
2268
2269 class LUStartupInstance(LogicalUnit):
2270   """Starts an instance.
2271
2272   """
2273   HPATH = "instance-start"
2274   HTYPE = constants.HTYPE_INSTANCE
2275   _OP_REQP = ["instance_name", "force"]
2276
2277   def BuildHooksEnv(self):
2278     """Build hooks env.
2279
2280     This runs on master, primary and secondary nodes of the instance.
2281
2282     """
2283     env = {
2284       "FORCE": self.op.force,
2285       }
2286     env.update(_BuildInstanceHookEnvByObject(self.instance))
2287     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2288           list(self.instance.secondary_nodes))
2289     return env, nl, nl
2290
2291   def CheckPrereq(self):
2292     """Check prerequisites.
2293
2294     This checks that the instance is in the cluster.
2295
2296     """
2297     instance = self.cfg.GetInstanceInfo(
2298       self.cfg.ExpandInstanceName(self.op.instance_name))
2299     if instance is None:
2300       raise errors.OpPrereqError("Instance '%s' not known" %
2301                                  self.op.instance_name)
2302
2303     # check bridges existance
2304     _CheckInstanceBridgesExist(instance)
2305
2306     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2307                          "starting instance %s" % instance.name,
2308                          instance.memory)
2309
2310     self.instance = instance
2311     self.op.instance_name = instance.name
2312
2313   def Exec(self, feedback_fn):
2314     """Start the instance.
2315
2316     """
2317     instance = self.instance
2318     force = self.op.force
2319     extra_args = getattr(self.op, "extra_args", "")
2320
2321     self.cfg.MarkInstanceUp(instance.name)
2322
2323     node_current = instance.primary_node
2324
2325     _StartInstanceDisks(self.cfg, instance, force)
2326
2327     if not rpc.call_instance_start(node_current, instance, extra_args):
2328       _ShutdownInstanceDisks(instance, self.cfg)
2329       raise errors.OpExecError("Could not start instance")
2330
2331
2332 class LURebootInstance(LogicalUnit):
2333   """Reboot an instance.
2334
2335   """
2336   HPATH = "instance-reboot"
2337   HTYPE = constants.HTYPE_INSTANCE
2338   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2339
2340   def BuildHooksEnv(self):
2341     """Build hooks env.
2342
2343     This runs on master, primary and secondary nodes of the instance.
2344
2345     """
2346     env = {
2347       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2348       }
2349     env.update(_BuildInstanceHookEnvByObject(self.instance))
2350     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2351           list(self.instance.secondary_nodes))
2352     return env, nl, nl
2353
2354   def CheckPrereq(self):
2355     """Check prerequisites.
2356
2357     This checks that the instance is in the cluster.
2358
2359     """
2360     instance = self.cfg.GetInstanceInfo(
2361       self.cfg.ExpandInstanceName(self.op.instance_name))
2362     if instance is None:
2363       raise errors.OpPrereqError("Instance '%s' not known" %
2364                                  self.op.instance_name)
2365
2366     # check bridges existance
2367     _CheckInstanceBridgesExist(instance)
2368
2369     self.instance = instance
2370     self.op.instance_name = instance.name
2371
2372   def Exec(self, feedback_fn):
2373     """Reboot the instance.
2374
2375     """
2376     instance = self.instance
2377     ignore_secondaries = self.op.ignore_secondaries
2378     reboot_type = self.op.reboot_type
2379     extra_args = getattr(self.op, "extra_args", "")
2380
2381     node_current = instance.primary_node
2382
2383     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2384                            constants.INSTANCE_REBOOT_HARD,
2385                            constants.INSTANCE_REBOOT_FULL]:
2386       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2387                                   (constants.INSTANCE_REBOOT_SOFT,
2388                                    constants.INSTANCE_REBOOT_HARD,
2389                                    constants.INSTANCE_REBOOT_FULL))
2390
2391     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2392                        constants.INSTANCE_REBOOT_HARD]:
2393       if not rpc.call_instance_reboot(node_current, instance,
2394                                       reboot_type, extra_args):
2395         raise errors.OpExecError("Could not reboot instance")
2396     else:
2397       if not rpc.call_instance_shutdown(node_current, instance):
2398         raise errors.OpExecError("could not shutdown instance for full reboot")
2399       _ShutdownInstanceDisks(instance, self.cfg)
2400       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2401       if not rpc.call_instance_start(node_current, instance, extra_args):
2402         _ShutdownInstanceDisks(instance, self.cfg)
2403         raise errors.OpExecError("Could not start instance for full reboot")
2404
2405     self.cfg.MarkInstanceUp(instance.name)
2406
2407
2408 class LUShutdownInstance(LogicalUnit):
2409   """Shutdown an instance.
2410
2411   """
2412   HPATH = "instance-stop"
2413   HTYPE = constants.HTYPE_INSTANCE
2414   _OP_REQP = ["instance_name"]
2415
2416   def BuildHooksEnv(self):
2417     """Build hooks env.
2418
2419     This runs on master, primary and secondary nodes of the instance.
2420
2421     """
2422     env = _BuildInstanceHookEnvByObject(self.instance)
2423     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2424           list(self.instance.secondary_nodes))
2425     return env, nl, nl
2426
2427   def CheckPrereq(self):
2428     """Check prerequisites.
2429
2430     This checks that the instance is in the cluster.
2431
2432     """
2433     instance = self.cfg.GetInstanceInfo(
2434       self.cfg.ExpandInstanceName(self.op.instance_name))
2435     if instance is None:
2436       raise errors.OpPrereqError("Instance '%s' not known" %
2437                                  self.op.instance_name)
2438     self.instance = instance
2439
2440   def Exec(self, feedback_fn):
2441     """Shutdown the instance.
2442
2443     """
2444     instance = self.instance
2445     node_current = instance.primary_node
2446     self.cfg.MarkInstanceDown(instance.name)
2447     if not rpc.call_instance_shutdown(node_current, instance):
2448       logger.Error("could not shutdown instance")
2449
2450     _ShutdownInstanceDisks(instance, self.cfg)
2451
2452
2453 class LUReinstallInstance(LogicalUnit):
2454   """Reinstall an instance.
2455
2456   """
2457   HPATH = "instance-reinstall"
2458   HTYPE = constants.HTYPE_INSTANCE
2459   _OP_REQP = ["instance_name"]
2460
2461   def BuildHooksEnv(self):
2462     """Build hooks env.
2463
2464     This runs on master, primary and secondary nodes of the instance.
2465
2466     """
2467     env = _BuildInstanceHookEnvByObject(self.instance)
2468     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2469           list(self.instance.secondary_nodes))
2470     return env, nl, nl
2471
2472   def CheckPrereq(self):
2473     """Check prerequisites.
2474
2475     This checks that the instance is in the cluster and is not running.
2476
2477     """
2478     instance = self.cfg.GetInstanceInfo(
2479       self.cfg.ExpandInstanceName(self.op.instance_name))
2480     if instance is None:
2481       raise errors.OpPrereqError("Instance '%s' not known" %
2482                                  self.op.instance_name)
2483     if instance.disk_template == constants.DT_DISKLESS:
2484       raise errors.OpPrereqError("Instance '%s' has no disks" %
2485                                  self.op.instance_name)
2486     if instance.status != "down":
2487       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2488                                  self.op.instance_name)
2489     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2490     if remote_info:
2491       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2492                                  (self.op.instance_name,
2493                                   instance.primary_node))
2494
2495     self.op.os_type = getattr(self.op, "os_type", None)
2496     if self.op.os_type is not None:
2497       # OS verification
2498       pnode = self.cfg.GetNodeInfo(
2499         self.cfg.ExpandNodeName(instance.primary_node))
2500       if pnode is None:
2501         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2502                                    self.op.pnode)
2503       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2504       if not os_obj:
2505         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2506                                    " primary node"  % self.op.os_type)
2507
2508     self.instance = instance
2509
2510   def Exec(self, feedback_fn):
2511     """Reinstall the instance.
2512
2513     """
2514     inst = self.instance
2515
2516     if self.op.os_type is not None:
2517       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2518       inst.os = self.op.os_type
2519       self.cfg.AddInstance(inst)
2520
2521     _StartInstanceDisks(self.cfg, inst, None)
2522     try:
2523       feedback_fn("Running the instance OS create scripts...")
2524       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2525         raise errors.OpExecError("Could not install OS for instance %s"
2526                                  " on node %s" %
2527                                  (inst.name, inst.primary_node))
2528     finally:
2529       _ShutdownInstanceDisks(inst, self.cfg)
2530
2531
2532 class LURenameInstance(LogicalUnit):
2533   """Rename an instance.
2534
2535   """
2536   HPATH = "instance-rename"
2537   HTYPE = constants.HTYPE_INSTANCE
2538   _OP_REQP = ["instance_name", "new_name"]
2539
2540   def BuildHooksEnv(self):
2541     """Build hooks env.
2542
2543     This runs on master, primary and secondary nodes of the instance.
2544
2545     """
2546     env = _BuildInstanceHookEnvByObject(self.instance)
2547     env["INSTANCE_NEW_NAME"] = self.op.new_name
2548     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2549           list(self.instance.secondary_nodes))
2550     return env, nl, nl
2551
2552   def CheckPrereq(self):
2553     """Check prerequisites.
2554
2555     This checks that the instance is in the cluster and is not running.
2556
2557     """
2558     instance = self.cfg.GetInstanceInfo(
2559       self.cfg.ExpandInstanceName(self.op.instance_name))
2560     if instance is None:
2561       raise errors.OpPrereqError("Instance '%s' not known" %
2562                                  self.op.instance_name)
2563     if instance.status != "down":
2564       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2565                                  self.op.instance_name)
2566     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2567     if remote_info:
2568       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569                                  (self.op.instance_name,
2570                                   instance.primary_node))
2571     self.instance = instance
2572
2573     # new name verification
2574     name_info = utils.HostInfo(self.op.new_name)
2575
2576     self.op.new_name = new_name = name_info.name
2577     instance_list = self.cfg.GetInstanceList()
2578     if new_name in instance_list:
2579       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2580                                  new_name)
2581
2582     if not getattr(self.op, "ignore_ip", False):
2583       command = ["fping", "-q", name_info.ip]
2584       result = utils.RunCmd(command)
2585       if not result.failed:
2586         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2587                                    (name_info.ip, new_name))
2588
2589
2590   def Exec(self, feedback_fn):
2591     """Reinstall the instance.
2592
2593     """
2594     inst = self.instance
2595     old_name = inst.name
2596
2597     self.cfg.RenameInstance(inst.name, self.op.new_name)
2598
2599     # re-read the instance from the configuration after rename
2600     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2601
2602     _StartInstanceDisks(self.cfg, inst, None)
2603     try:
2604       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2605                                           "sda", "sdb"):
2606         msg = ("Could run OS rename script for instance %s on node %s (but the"
2607                " instance has been renamed in Ganeti)" %
2608                (inst.name, inst.primary_node))
2609         logger.Error(msg)
2610     finally:
2611       _ShutdownInstanceDisks(inst, self.cfg)
2612
2613
2614 class LURemoveInstance(LogicalUnit):
2615   """Remove an instance.
2616
2617   """
2618   HPATH = "instance-remove"
2619   HTYPE = constants.HTYPE_INSTANCE
2620   _OP_REQP = ["instance_name", "ignore_failures"]
2621
2622   def BuildHooksEnv(self):
2623     """Build hooks env.
2624
2625     This runs on master, primary and secondary nodes of the instance.
2626
2627     """
2628     env = _BuildInstanceHookEnvByObject(self.instance)
2629     nl = [self.sstore.GetMasterNode()]
2630     return env, nl, nl
2631
2632   def CheckPrereq(self):
2633     """Check prerequisites.
2634
2635     This checks that the instance is in the cluster.
2636
2637     """
2638     instance = self.cfg.GetInstanceInfo(
2639       self.cfg.ExpandInstanceName(self.op.instance_name))
2640     if instance is None:
2641       raise errors.OpPrereqError("Instance '%s' not known" %
2642                                  self.op.instance_name)
2643     self.instance = instance
2644
2645   def Exec(self, feedback_fn):
2646     """Remove the instance.
2647
2648     """
2649     instance = self.instance
2650     logger.Info("shutting down instance %s on node %s" %
2651                 (instance.name, instance.primary_node))
2652
2653     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2654       if self.op.ignore_failures:
2655         feedback_fn("Warning: can't shutdown instance")
2656       else:
2657         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2658                                  (instance.name, instance.primary_node))
2659
2660     logger.Info("removing block devices for instance %s" % instance.name)
2661
2662     if not _RemoveDisks(instance, self.cfg):
2663       if self.op.ignore_failures:
2664         feedback_fn("Warning: can't remove instance's disks")
2665       else:
2666         raise errors.OpExecError("Can't remove instance's disks")
2667
2668     logger.Info("removing instance %s out of cluster config" % instance.name)
2669
2670     self.cfg.RemoveInstance(instance.name)
2671
2672
2673 class LUQueryInstances(NoHooksLU):
2674   """Logical unit for querying instances.
2675
2676   """
2677   _OP_REQP = ["output_fields", "names"]
2678
2679   def CheckPrereq(self):
2680     """Check prerequisites.
2681
2682     This checks that the fields required are valid output fields.
2683
2684     """
2685     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2686     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2687                                "admin_state", "admin_ram",
2688                                "disk_template", "ip", "mac", "bridge",
2689                                "sda_size", "sdb_size", "vcpus"],
2690                        dynamic=self.dynamic_fields,
2691                        selected=self.op.output_fields)
2692
2693     self.wanted = _GetWantedInstances(self, self.op.names)
2694
2695   def Exec(self, feedback_fn):
2696     """Computes the list of nodes and their attributes.
2697
2698     """
2699     instance_names = self.wanted
2700     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2701                      in instance_names]
2702
2703     # begin data gathering
2704
2705     nodes = frozenset([inst.primary_node for inst in instance_list])
2706
2707     bad_nodes = []
2708     if self.dynamic_fields.intersection(self.op.output_fields):
2709       live_data = {}
2710       node_data = rpc.call_all_instances_info(nodes)
2711       for name in nodes:
2712         result = node_data[name]
2713         if result:
2714           live_data.update(result)
2715         elif result == False:
2716           bad_nodes.append(name)
2717         # else no instance is alive
2718     else:
2719       live_data = dict([(name, {}) for name in instance_names])
2720
2721     # end data gathering
2722
2723     output = []
2724     for instance in instance_list:
2725       iout = []
2726       for field in self.op.output_fields:
2727         if field == "name":
2728           val = instance.name
2729         elif field == "os":
2730           val = instance.os
2731         elif field == "pnode":
2732           val = instance.primary_node
2733         elif field == "snodes":
2734           val = list(instance.secondary_nodes)
2735         elif field == "admin_state":
2736           val = (instance.status != "down")
2737         elif field == "oper_state":
2738           if instance.primary_node in bad_nodes:
2739             val = None
2740           else:
2741             val = bool(live_data.get(instance.name))
2742         elif field == "status":
2743           if instance.primary_node in bad_nodes:
2744             val = "ERROR_nodedown"
2745           else:
2746             running = bool(live_data.get(instance.name))
2747             if running:
2748               if instance.status != "down":
2749                 val = "running"
2750               else:
2751                 val = "ERROR_up"
2752             else:
2753               if instance.status != "down":
2754                 val = "ERROR_down"
2755               else:
2756                 val = "ADMIN_down"
2757         elif field == "admin_ram":
2758           val = instance.memory
2759         elif field == "oper_ram":
2760           if instance.primary_node in bad_nodes:
2761             val = None
2762           elif instance.name in live_data:
2763             val = live_data[instance.name].get("memory", "?")
2764           else:
2765             val = "-"
2766         elif field == "disk_template":
2767           val = instance.disk_template
2768         elif field == "ip":
2769           val = instance.nics[0].ip
2770         elif field == "bridge":
2771           val = instance.nics[0].bridge
2772         elif field == "mac":
2773           val = instance.nics[0].mac
2774         elif field == "sda_size" or field == "sdb_size":
2775           disk = instance.FindDisk(field[:3])
2776           if disk is None:
2777             val = None
2778           else:
2779             val = disk.size
2780         elif field == "vcpus":
2781           val = instance.vcpus
2782         else:
2783           raise errors.ParameterError(field)
2784         iout.append(val)
2785       output.append(iout)
2786
2787     return output
2788
2789
2790 class LUFailoverInstance(LogicalUnit):
2791   """Failover an instance.
2792
2793   """
2794   HPATH = "instance-failover"
2795   HTYPE = constants.HTYPE_INSTANCE
2796   _OP_REQP = ["instance_name", "ignore_consistency"]
2797
2798   def BuildHooksEnv(self):
2799     """Build hooks env.
2800
2801     This runs on master, primary and secondary nodes of the instance.
2802
2803     """
2804     env = {
2805       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2806       }
2807     env.update(_BuildInstanceHookEnvByObject(self.instance))
2808     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2809     return env, nl, nl
2810
2811   def CheckPrereq(self):
2812     """Check prerequisites.
2813
2814     This checks that the instance is in the cluster.
2815
2816     """
2817     instance = self.cfg.GetInstanceInfo(
2818       self.cfg.ExpandInstanceName(self.op.instance_name))
2819     if instance is None:
2820       raise errors.OpPrereqError("Instance '%s' not known" %
2821                                  self.op.instance_name)
2822
2823     if instance.disk_template not in constants.DTS_NET_MIRROR:
2824       raise errors.OpPrereqError("Instance's disk layout is not"
2825                                  " network mirrored, cannot failover.")
2826
2827     secondary_nodes = instance.secondary_nodes
2828     if not secondary_nodes:
2829       raise errors.ProgrammerError("no secondary node but using "
2830                                    "DT_REMOTE_RAID1 template")
2831
2832     target_node = secondary_nodes[0]
2833     # check memory requirements on the secondary node
2834     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2835                          instance.name, instance.memory)
2836
2837     # check bridge existance
2838     brlist = [nic.bridge for nic in instance.nics]
2839     if not rpc.call_bridges_exist(target_node, brlist):
2840       raise errors.OpPrereqError("One or more target bridges %s does not"
2841                                  " exist on destination node '%s'" %
2842                                  (brlist, target_node))
2843
2844     self.instance = instance
2845
2846   def Exec(self, feedback_fn):
2847     """Failover an instance.
2848
2849     The failover is done by shutting it down on its present node and
2850     starting it on the secondary.
2851
2852     """
2853     instance = self.instance
2854
2855     source_node = instance.primary_node
2856     target_node = instance.secondary_nodes[0]
2857
2858     feedback_fn("* checking disk consistency between source and target")
2859     for dev in instance.disks:
2860       # for remote_raid1, these are md over drbd
2861       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2862         if instance.status == "up" and not self.op.ignore_consistency:
2863           raise errors.OpExecError("Disk %s is degraded on target node,"
2864                                    " aborting failover." % dev.iv_name)
2865
2866     feedback_fn("* shutting down instance on source node")
2867     logger.Info("Shutting down instance %s on node %s" %
2868                 (instance.name, source_node))
2869
2870     if not rpc.call_instance_shutdown(source_node, instance):
2871       if self.op.ignore_consistency:
2872         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2873                      " anyway. Please make sure node %s is down"  %
2874                      (instance.name, source_node, source_node))
2875       else:
2876         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2877                                  (instance.name, source_node))
2878
2879     feedback_fn("* deactivating the instance's disks on source node")
2880     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2881       raise errors.OpExecError("Can't shut down the instance's disks.")
2882
2883     instance.primary_node = target_node
2884     # distribute new instance config to the other nodes
2885     self.cfg.Update(instance)
2886
2887     # Only start the instance if it's marked as up
2888     if instance.status == "up":
2889       feedback_fn("* activating the instance's disks on target node")
2890       logger.Info("Starting instance %s on node %s" %
2891                   (instance.name, target_node))
2892
2893       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2894                                                ignore_secondaries=True)
2895       if not disks_ok:
2896         _ShutdownInstanceDisks(instance, self.cfg)
2897         raise errors.OpExecError("Can't activate the instance's disks")
2898
2899       feedback_fn("* starting the instance on the target node")
2900       if not rpc.call_instance_start(target_node, instance, None):
2901         _ShutdownInstanceDisks(instance, self.cfg)
2902         raise errors.OpExecError("Could not start instance %s on node %s." %
2903                                  (instance.name, target_node))
2904
2905
2906 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2907   """Create a tree of block devices on the primary node.
2908
2909   This always creates all devices.
2910
2911   """
2912   if device.children:
2913     for child in device.children:
2914       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2915         return False
2916
2917   cfg.SetDiskID(device, node)
2918   new_id = rpc.call_blockdev_create(node, device, device.size,
2919                                     instance.name, True, info)
2920   if not new_id:
2921     return False
2922   if device.physical_id is None:
2923     device.physical_id = new_id
2924   return True
2925
2926
2927 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2928   """Create a tree of block devices on a secondary node.
2929
2930   If this device type has to be created on secondaries, create it and
2931   all its children.
2932
2933   If not, just recurse to children keeping the same 'force' value.
2934
2935   """
2936   if device.CreateOnSecondary():
2937     force = True
2938   if device.children:
2939     for child in device.children:
2940       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2941                                         child, force, info):
2942         return False
2943
2944   if not force:
2945     return True
2946   cfg.SetDiskID(device, node)
2947   new_id = rpc.call_blockdev_create(node, device, device.size,
2948                                     instance.name, False, info)
2949   if not new_id:
2950     return False
2951   if device.physical_id is None:
2952     device.physical_id = new_id
2953   return True
2954
2955
2956 def _GenerateUniqueNames(cfg, exts):
2957   """Generate a suitable LV name.
2958
2959   This will generate a logical volume name for the given instance.
2960
2961   """
2962   results = []
2963   for val in exts:
2964     new_id = cfg.GenerateUniqueID()
2965     results.append("%s%s" % (new_id, val))
2966   return results
2967
2968
2969 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2970   """Generate a drbd device complete with its children.
2971
2972   """
2973   port = cfg.AllocatePort()
2974   vgname = cfg.GetVGName()
2975   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2976                           logical_id=(vgname, names[0]))
2977   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2978                           logical_id=(vgname, names[1]))
2979   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2980                           logical_id = (primary, secondary, port),
2981                           children = [dev_data, dev_meta])
2982   return drbd_dev
2983
2984
2985 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2986   """Generate a drbd8 device complete with its children.
2987
2988   """
2989   port = cfg.AllocatePort()
2990   vgname = cfg.GetVGName()
2991   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2992                           logical_id=(vgname, names[0]))
2993   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2994                           logical_id=(vgname, names[1]))
2995   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2996                           logical_id = (primary, secondary, port),
2997                           children = [dev_data, dev_meta],
2998                           iv_name=iv_name)
2999   return drbd_dev
3000
3001 def _GenerateDiskTemplate(cfg, template_name,
3002                           instance_name, primary_node,
3003                           secondary_nodes, disk_sz, swap_sz):
3004   """Generate the entire disk layout for a given template type.
3005
3006   """
3007   #TODO: compute space requirements
3008
3009   vgname = cfg.GetVGName()
3010   if template_name == constants.DT_DISKLESS:
3011     disks = []
3012   elif template_name == constants.DT_PLAIN:
3013     if len(secondary_nodes) != 0:
3014       raise errors.ProgrammerError("Wrong template configuration")
3015
3016     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3017     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3018                            logical_id=(vgname, names[0]),
3019                            iv_name = "sda")
3020     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3021                            logical_id=(vgname, names[1]),
3022                            iv_name = "sdb")
3023     disks = [sda_dev, sdb_dev]
3024   elif template_name == constants.DT_LOCAL_RAID1:
3025     if len(secondary_nodes) != 0:
3026       raise errors.ProgrammerError("Wrong template configuration")
3027
3028
3029     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3030                                        ".sdb_m1", ".sdb_m2"])
3031     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3032                               logical_id=(vgname, names[0]))
3033     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3034                               logical_id=(vgname, names[1]))
3035     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3036                               size=disk_sz,
3037                               children = [sda_dev_m1, sda_dev_m2])
3038     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3039                               logical_id=(vgname, names[2]))
3040     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3041                               logical_id=(vgname, names[3]))
3042     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3043                               size=swap_sz,
3044                               children = [sdb_dev_m1, sdb_dev_m2])
3045     disks = [md_sda_dev, md_sdb_dev]
3046   elif template_name == constants.DT_REMOTE_RAID1:
3047     if len(secondary_nodes) != 1:
3048       raise errors.ProgrammerError("Wrong template configuration")
3049     remote_node = secondary_nodes[0]
3050     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3051                                        ".sdb_data", ".sdb_meta"])
3052     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3053                                          disk_sz, names[0:2])
3054     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3055                               children = [drbd_sda_dev], size=disk_sz)
3056     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3057                                          swap_sz, names[2:4])
3058     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3059                               children = [drbd_sdb_dev], size=swap_sz)
3060     disks = [md_sda_dev, md_sdb_dev]
3061   elif template_name == constants.DT_DRBD8:
3062     if len(secondary_nodes) != 1:
3063       raise errors.ProgrammerError("Wrong template configuration")
3064     remote_node = secondary_nodes[0]
3065     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3066                                        ".sdb_data", ".sdb_meta"])
3067     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3068                                          disk_sz, names[0:2], "sda")
3069     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3070                                          swap_sz, names[2:4], "sdb")
3071     disks = [drbd_sda_dev, drbd_sdb_dev]
3072   else:
3073     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3074   return disks
3075
3076
3077 def _GetInstanceInfoText(instance):
3078   """Compute that text that should be added to the disk's metadata.
3079
3080   """
3081   return "originstname+%s" % instance.name
3082
3083
3084 def _CreateDisks(cfg, instance):
3085   """Create all disks for an instance.
3086
3087   This abstracts away some work from AddInstance.
3088
3089   Args:
3090     instance: the instance object
3091
3092   Returns:
3093     True or False showing the success of the creation process
3094
3095   """
3096   info = _GetInstanceInfoText(instance)
3097
3098   for device in instance.disks:
3099     logger.Info("creating volume %s for instance %s" %
3100               (device.iv_name, instance.name))
3101     #HARDCODE
3102     for secondary_node in instance.secondary_nodes:
3103       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3104                                         device, False, info):
3105         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3106                      (device.iv_name, device, secondary_node))
3107         return False
3108     #HARDCODE
3109     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3110                                     instance, device, info):
3111       logger.Error("failed to create volume %s on primary!" %
3112                    device.iv_name)
3113       return False
3114   return True
3115
3116
3117 def _RemoveDisks(instance, cfg):
3118   """Remove all disks for an instance.
3119
3120   This abstracts away some work from `AddInstance()` and
3121   `RemoveInstance()`. Note that in case some of the devices couldn't
3122   be removed, the removal will continue with the other ones (compare
3123   with `_CreateDisks()`).
3124
3125   Args:
3126     instance: the instance object
3127
3128   Returns:
3129     True or False showing the success of the removal proces
3130
3131   """
3132   logger.Info("removing block devices for instance %s" % instance.name)
3133
3134   result = True
3135   for device in instance.disks:
3136     for node, disk in device.ComputeNodeTree(instance.primary_node):
3137       cfg.SetDiskID(disk, node)
3138       if not rpc.call_blockdev_remove(node, disk):
3139         logger.Error("could not remove block device %s on node %s,"
3140                      " continuing anyway" %
3141                      (device.iv_name, node))
3142         result = False
3143   return result
3144
3145
3146 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3147   """Compute disk size requirements in the volume group
3148
3149   This is currently hard-coded for the two-drive layout.
3150
3151   """
3152   # Required free disk space as a function of disk and swap space
3153   req_size_dict = {
3154     constants.DT_DISKLESS: None,
3155     constants.DT_PLAIN: disk_size + swap_size,
3156     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3157     # 256 MB are added for drbd metadata, 128MB for each drbd device
3158     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3159     constants.DT_DRBD8: disk_size + swap_size + 256,
3160   }
3161
3162   if disk_template not in req_size_dict:
3163     raise errors.ProgrammerError("Disk template '%s' size requirement"
3164                                  " is unknown" %  disk_template)
3165
3166   return req_size_dict[disk_template]
3167
3168
3169 class LUCreateInstance(LogicalUnit):
3170   """Create an instance.
3171
3172   """
3173   HPATH = "instance-add"
3174   HTYPE = constants.HTYPE_INSTANCE
3175   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3176               "disk_template", "swap_size", "mode", "start", "vcpus",
3177               "wait_for_sync", "ip_check", "mac"]
3178
3179   def _RunAllocator(self):
3180     """Run the allocator based on input opcode.
3181
3182     """
3183     disks = [{"size": self.op.disk_size, "mode": "w"},
3184              {"size": self.op.swap_size, "mode": "w"}]
3185     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3186              "bridge": self.op.bridge}]
3187     ial = IAllocator(self.cfg, self.sstore,
3188                      mode=constants.IALLOCATOR_MODE_ALLOC,
3189                      name=self.op.instance_name,
3190                      disk_template=self.op.disk_template,
3191                      tags=[],
3192                      os=self.op.os_type,
3193                      vcpus=self.op.vcpus,
3194                      mem_size=self.op.mem_size,
3195                      disks=disks,
3196                      nics=nics,
3197                      )
3198
3199     ial.Run(self.op.iallocator)
3200
3201     if not ial.success:
3202       raise errors.OpPrereqError("Can't compute nodes using"
3203                                  " iallocator '%s': %s" % (self.op.iallocator,
3204                                                            ial.info))
3205     if len(ial.nodes) != ial.required_nodes:
3206       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3207                                  " of nodes (%s), required %s" %
3208                                  (len(ial.nodes), ial.required_nodes))
3209     self.op.pnode = ial.nodes[0]
3210     logger.ToStdout("Selected nodes for the instance: %s" %
3211                     (", ".join(ial.nodes),))
3212     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3213                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3214     if ial.required_nodes == 2:
3215       self.op.snode = ial.nodes[1]
3216
3217   def BuildHooksEnv(self):
3218     """Build hooks env.
3219
3220     This runs on master, primary and secondary nodes of the instance.
3221
3222     """
3223     env = {
3224       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3225       "INSTANCE_DISK_SIZE": self.op.disk_size,
3226       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3227       "INSTANCE_ADD_MODE": self.op.mode,
3228       }
3229     if self.op.mode == constants.INSTANCE_IMPORT:
3230       env["INSTANCE_SRC_NODE"] = self.op.src_node
3231       env["INSTANCE_SRC_PATH"] = self.op.src_path
3232       env["INSTANCE_SRC_IMAGE"] = self.src_image
3233
3234     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3235       primary_node=self.op.pnode,
3236       secondary_nodes=self.secondaries,
3237       status=self.instance_status,
3238       os_type=self.op.os_type,
3239       memory=self.op.mem_size,
3240       vcpus=self.op.vcpus,
3241       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3242     ))
3243
3244     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3245           self.secondaries)
3246     return env, nl, nl
3247
3248
3249   def CheckPrereq(self):
3250     """Check prerequisites.
3251
3252     """
3253     # set optional parameters to none if they don't exist
3254     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3255                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3256                  "vnc_bind_address"]:
3257       if not hasattr(self.op, attr):
3258         setattr(self.op, attr, None)
3259
3260     if self.op.mode not in (constants.INSTANCE_CREATE,
3261                             constants.INSTANCE_IMPORT):
3262       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3263                                  self.op.mode)
3264
3265     if self.op.mode == constants.INSTANCE_IMPORT:
3266       src_node = getattr(self.op, "src_node", None)
3267       src_path = getattr(self.op, "src_path", None)
3268       if src_node is None or src_path is None:
3269         raise errors.OpPrereqError("Importing an instance requires source"
3270                                    " node and path options")
3271       src_node_full = self.cfg.ExpandNodeName(src_node)
3272       if src_node_full is None:
3273         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3274       self.op.src_node = src_node = src_node_full
3275
3276       if not os.path.isabs(src_path):
3277         raise errors.OpPrereqError("The source path must be absolute")
3278
3279       export_info = rpc.call_export_info(src_node, src_path)
3280
3281       if not export_info:
3282         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3283
3284       if not export_info.has_section(constants.INISECT_EXP):
3285         raise errors.ProgrammerError("Corrupted export config")
3286
3287       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3288       if (int(ei_version) != constants.EXPORT_VERSION):
3289         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3290                                    (ei_version, constants.EXPORT_VERSION))
3291
3292       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3293         raise errors.OpPrereqError("Can't import instance with more than"
3294                                    " one data disk")
3295
3296       # FIXME: are the old os-es, disk sizes, etc. useful?
3297       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3298       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3299                                                          'disk0_dump'))
3300       self.src_image = diskimage
3301     else: # INSTANCE_CREATE
3302       if getattr(self.op, "os_type", None) is None:
3303         raise errors.OpPrereqError("No guest OS specified")
3304
3305     #### instance parameters check
3306
3307     # disk template and mirror node verification
3308     if self.op.disk_template not in constants.DISK_TEMPLATES:
3309       raise errors.OpPrereqError("Invalid disk template name")
3310
3311     # instance name verification
3312     hostname1 = utils.HostInfo(self.op.instance_name)
3313
3314     self.op.instance_name = instance_name = hostname1.name
3315     instance_list = self.cfg.GetInstanceList()
3316     if instance_name in instance_list:
3317       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3318                                  instance_name)
3319
3320     # ip validity checks
3321     ip = getattr(self.op, "ip", None)
3322     if ip is None or ip.lower() == "none":
3323       inst_ip = None
3324     elif ip.lower() == "auto":
3325       inst_ip = hostname1.ip
3326     else:
3327       if not utils.IsValidIP(ip):
3328         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3329                                    " like a valid IP" % ip)
3330       inst_ip = ip
3331     self.inst_ip = self.op.ip = inst_ip
3332
3333     if self.op.start and not self.op.ip_check:
3334       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3335                                  " adding an instance in start mode")
3336
3337     if self.op.ip_check:
3338       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3339         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3340                                    (hostname1.ip, instance_name))
3341
3342     # MAC address verification
3343     if self.op.mac != "auto":
3344       if not utils.IsValidMac(self.op.mac.lower()):
3345         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3346                                    self.op.mac)
3347
3348     # bridge verification
3349     bridge = getattr(self.op, "bridge", None)
3350     if bridge is None:
3351       self.op.bridge = self.cfg.GetDefBridge()
3352     else:
3353       self.op.bridge = bridge
3354
3355     # boot order verification
3356     if self.op.hvm_boot_order is not None:
3357       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3358         raise errors.OpPrereqError("invalid boot order specified,"
3359                                    " must be one or more of [acdn]")
3360     #### allocator run
3361
3362     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3363       raise errors.OpPrereqError("One and only one of iallocator and primary"
3364                                  " node must be given")
3365
3366     if self.op.iallocator is not None:
3367       self._RunAllocator()
3368
3369     #### node related checks
3370
3371     # check primary node
3372     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3373     if pnode is None:
3374       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3375                                  self.op.pnode)
3376     self.op.pnode = pnode.name
3377     self.pnode = pnode
3378     self.secondaries = []
3379
3380     # mirror node verification
3381     if self.op.disk_template in constants.DTS_NET_MIRROR:
3382       if getattr(self.op, "snode", None) is None:
3383         raise errors.OpPrereqError("The networked disk templates need"
3384                                    " a mirror node")
3385
3386       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3387       if snode_name is None:
3388         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3389                                    self.op.snode)
3390       elif snode_name == pnode.name:
3391         raise errors.OpPrereqError("The secondary node cannot be"
3392                                    " the primary node.")
3393       self.secondaries.append(snode_name)
3394
3395     req_size = _ComputeDiskSize(self.op.disk_template,
3396                                 self.op.disk_size, self.op.swap_size)
3397
3398     # Check lv size requirements
3399     if req_size is not None:
3400       nodenames = [pnode.name] + self.secondaries
3401       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3402       for node in nodenames:
3403         info = nodeinfo.get(node, None)
3404         if not info:
3405           raise errors.OpPrereqError("Cannot get current information"
3406                                      " from node '%s'" % node)
3407         vg_free = info.get('vg_free', None)
3408         if not isinstance(vg_free, int):
3409           raise errors.OpPrereqError("Can't compute free disk space on"
3410                                      " node %s" % node)
3411         if req_size > info['vg_free']:
3412           raise errors.OpPrereqError("Not enough disk space on target node %s."
3413                                      " %d MB available, %d MB required" %
3414                                      (node, info['vg_free'], req_size))
3415
3416     # os verification
3417     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3418     if not os_obj:
3419       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3420                                  " primary node"  % self.op.os_type)
3421
3422     if self.op.kernel_path == constants.VALUE_NONE:
3423       raise errors.OpPrereqError("Can't set instance kernel to none")
3424
3425
3426     # bridge check on primary node
3427     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3428       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3429                                  " destination node '%s'" %
3430                                  (self.op.bridge, pnode.name))
3431
3432     # memory check on primary node
3433     if self.op.start:
3434       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3435                            "creating instance %s" % self.op.instance_name,
3436                            self.op.mem_size)
3437
3438     # hvm_cdrom_image_path verification
3439     if self.op.hvm_cdrom_image_path is not None:
3440       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3441         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3442                                    " be an absolute path or None, not %s" %
3443                                    self.op.hvm_cdrom_image_path)
3444       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3445         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3446                                    " regular file or a symlink pointing to"
3447                                    " an existing regular file, not %s" %
3448                                    self.op.hvm_cdrom_image_path)
3449
3450     # vnc_bind_address verification
3451     if self.op.vnc_bind_address is not None:
3452       if not utils.IsValidIP(self.op.vnc_bind_address):
3453         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3454                                    " like a valid IP address" %
3455                                    self.op.vnc_bind_address)
3456
3457     if self.op.start:
3458       self.instance_status = 'up'
3459     else:
3460       self.instance_status = 'down'
3461
3462   def Exec(self, feedback_fn):
3463     """Create and add the instance to the cluster.
3464
3465     """
3466     instance = self.op.instance_name
3467     pnode_name = self.pnode.name
3468
3469     if self.op.mac == "auto":
3470       mac_address = self.cfg.GenerateMAC()
3471     else:
3472       mac_address = self.op.mac
3473
3474     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3475     if self.inst_ip is not None:
3476       nic.ip = self.inst_ip
3477
3478     ht_kind = self.sstore.GetHypervisorType()
3479     if ht_kind in constants.HTS_REQ_PORT:
3480       network_port = self.cfg.AllocatePort()
3481     else:
3482       network_port = None
3483
3484     if self.op.vnc_bind_address is None:
3485       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3486
3487     disks = _GenerateDiskTemplate(self.cfg,
3488                                   self.op.disk_template,
3489                                   instance, pnode_name,
3490                                   self.secondaries, self.op.disk_size,
3491                                   self.op.swap_size)
3492
3493     iobj = objects.Instance(name=instance, os=self.op.os_type,
3494                             primary_node=pnode_name,
3495                             memory=self.op.mem_size,
3496                             vcpus=self.op.vcpus,
3497                             nics=[nic], disks=disks,
3498                             disk_template=self.op.disk_template,
3499                             status=self.instance_status,
3500                             network_port=network_port,
3501                             kernel_path=self.op.kernel_path,
3502                             initrd_path=self.op.initrd_path,
3503                             hvm_boot_order=self.op.hvm_boot_order,
3504                             hvm_acpi=self.op.hvm_acpi,
3505                             hvm_pae=self.op.hvm_pae,
3506                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3507                             vnc_bind_address=self.op.vnc_bind_address,
3508                             )
3509
3510     feedback_fn("* creating instance disks...")
3511     if not _CreateDisks(self.cfg, iobj):
3512       _RemoveDisks(iobj, self.cfg)
3513       raise errors.OpExecError("Device creation failed, reverting...")
3514
3515     feedback_fn("adding instance %s to cluster config" % instance)
3516
3517     self.cfg.AddInstance(iobj)
3518
3519     if self.op.wait_for_sync:
3520       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3521     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3522       # make sure the disks are not degraded (still sync-ing is ok)
3523       time.sleep(15)
3524       feedback_fn("* checking mirrors status")
3525       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3526     else:
3527       disk_abort = False
3528
3529     if disk_abort:
3530       _RemoveDisks(iobj, self.cfg)
3531       self.cfg.RemoveInstance(iobj.name)
3532       raise errors.OpExecError("There are some degraded disks for"
3533                                " this instance")
3534
3535     feedback_fn("creating os for instance %s on node %s" %
3536                 (instance, pnode_name))
3537
3538     if iobj.disk_template != constants.DT_DISKLESS:
3539       if self.op.mode == constants.INSTANCE_CREATE:
3540         feedback_fn("* running the instance OS create scripts...")
3541         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3542           raise errors.OpExecError("could not add os for instance %s"
3543                                    " on node %s" %
3544                                    (instance, pnode_name))
3545
3546       elif self.op.mode == constants.INSTANCE_IMPORT:
3547         feedback_fn("* running the instance OS import scripts...")
3548         src_node = self.op.src_node
3549         src_image = self.src_image
3550         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3551                                                 src_node, src_image):
3552           raise errors.OpExecError("Could not import os for instance"
3553                                    " %s on node %s" %
3554                                    (instance, pnode_name))
3555       else:
3556         # also checked in the prereq part
3557         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3558                                      % self.op.mode)
3559
3560     if self.op.start:
3561       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3562       feedback_fn("* starting instance...")
3563       if not rpc.call_instance_start(pnode_name, iobj, None):
3564         raise errors.OpExecError("Could not start instance")
3565
3566
3567 class LUConnectConsole(NoHooksLU):
3568   """Connect to an instance's console.
3569
3570   This is somewhat special in that it returns the command line that
3571   you need to run on the master node in order to connect to the
3572   console.
3573
3574   """
3575   _OP_REQP = ["instance_name"]
3576
3577   def CheckPrereq(self):
3578     """Check prerequisites.
3579
3580     This checks that the instance is in the cluster.
3581
3582     """
3583     instance = self.cfg.GetInstanceInfo(
3584       self.cfg.ExpandInstanceName(self.op.instance_name))
3585     if instance is None:
3586       raise errors.OpPrereqError("Instance '%s' not known" %
3587                                  self.op.instance_name)
3588     self.instance = instance
3589
3590   def Exec(self, feedback_fn):
3591     """Connect to the console of an instance
3592
3593     """
3594     instance = self.instance
3595     node = instance.primary_node
3596
3597     node_insts = rpc.call_instance_list([node])[node]
3598     if node_insts is False:
3599       raise errors.OpExecError("Can't connect to node %s." % node)
3600
3601     if instance.name not in node_insts:
3602       raise errors.OpExecError("Instance %s is not running." % instance.name)
3603
3604     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3605
3606     hyper = hypervisor.GetHypervisor()
3607     console_cmd = hyper.GetShellCommandForConsole(instance)
3608     # build ssh cmdline
3609     argv = ["ssh", "-q", "-t"]
3610     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3611     argv.extend(ssh.BATCH_MODE_OPTS)
3612     argv.append(node)
3613     argv.append(console_cmd)
3614     return "ssh", argv
3615
3616
3617 class LUAddMDDRBDComponent(LogicalUnit):
3618   """Adda new mirror member to an instance's disk.
3619
3620   """
3621   HPATH = "mirror-add"
3622   HTYPE = constants.HTYPE_INSTANCE
3623   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3624
3625   def BuildHooksEnv(self):
3626     """Build hooks env.
3627
3628     This runs on the master, the primary and all the secondaries.
3629
3630     """
3631     env = {
3632       "NEW_SECONDARY": self.op.remote_node,
3633       "DISK_NAME": self.op.disk_name,
3634       }
3635     env.update(_BuildInstanceHookEnvByObject(self.instance))
3636     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3637           self.op.remote_node,] + list(self.instance.secondary_nodes)
3638     return env, nl, nl
3639
3640   def CheckPrereq(self):
3641     """Check prerequisites.
3642
3643     This checks that the instance is in the cluster.
3644
3645     """
3646     instance = self.cfg.GetInstanceInfo(
3647       self.cfg.ExpandInstanceName(self.op.instance_name))
3648     if instance is None:
3649       raise errors.OpPrereqError("Instance '%s' not known" %
3650                                  self.op.instance_name)
3651     self.instance = instance
3652
3653     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3654     if remote_node is None:
3655       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3656     self.remote_node = remote_node
3657
3658     if remote_node == instance.primary_node:
3659       raise errors.OpPrereqError("The specified node is the primary node of"
3660                                  " the instance.")
3661
3662     if instance.disk_template != constants.DT_REMOTE_RAID1:
3663       raise errors.OpPrereqError("Instance's disk layout is not"
3664                                  " remote_raid1.")
3665     for disk in instance.disks:
3666       if disk.iv_name == self.op.disk_name:
3667         break
3668     else:
3669       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3670                                  " instance." % self.op.disk_name)
3671     if len(disk.children) > 1:
3672       raise errors.OpPrereqError("The device already has two slave devices."
3673                                  " This would create a 3-disk raid1 which we"
3674                                  " don't allow.")
3675     self.disk = disk
3676
3677   def Exec(self, feedback_fn):
3678     """Add the mirror component
3679
3680     """
3681     disk = self.disk
3682     instance = self.instance
3683
3684     remote_node = self.remote_node
3685     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3686     names = _GenerateUniqueNames(self.cfg, lv_names)
3687     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3688                                      remote_node, disk.size, names)
3689
3690     logger.Info("adding new mirror component on secondary")
3691     #HARDCODE
3692     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3693                                       new_drbd, False,
3694                                       _GetInstanceInfoText(instance)):
3695       raise errors.OpExecError("Failed to create new component on secondary"
3696                                " node %s" % remote_node)
3697
3698     logger.Info("adding new mirror component on primary")
3699     #HARDCODE
3700     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3701                                     instance, new_drbd,
3702                                     _GetInstanceInfoText(instance)):
3703       # remove secondary dev
3704       self.cfg.SetDiskID(new_drbd, remote_node)
3705       rpc.call_blockdev_remove(remote_node, new_drbd)
3706       raise errors.OpExecError("Failed to create volume on primary")
3707
3708     # the device exists now
3709     # call the primary node to add the mirror to md
3710     logger.Info("adding new mirror component to md")
3711     if not rpc.call_blockdev_addchildren(instance.primary_node,
3712                                          disk, [new_drbd]):
3713       logger.Error("Can't add mirror compoment to md!")
3714       self.cfg.SetDiskID(new_drbd, remote_node)
3715       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3716         logger.Error("Can't rollback on secondary")
3717       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3718       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3719         logger.Error("Can't rollback on primary")
3720       raise errors.OpExecError("Can't add mirror component to md array")
3721
3722     disk.children.append(new_drbd)
3723
3724     self.cfg.AddInstance(instance)
3725
3726     _WaitForSync(self.cfg, instance, self.proc)
3727
3728     return 0
3729
3730
3731 class LURemoveMDDRBDComponent(LogicalUnit):
3732   """Remove a component from a remote_raid1 disk.
3733
3734   """
3735   HPATH = "mirror-remove"
3736   HTYPE = constants.HTYPE_INSTANCE
3737   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3738
3739   def BuildHooksEnv(self):
3740     """Build hooks env.
3741
3742     This runs on the master, the primary and all the secondaries.
3743
3744     """
3745     env = {
3746       "DISK_NAME": self.op.disk_name,
3747       "DISK_ID": self.op.disk_id,
3748       "OLD_SECONDARY": self.old_secondary,
3749       }
3750     env.update(_BuildInstanceHookEnvByObject(self.instance))
3751     nl = [self.sstore.GetMasterNode(),
3752           self.instance.primary_node] + list(self.instance.secondary_nodes)
3753     return env, nl, nl
3754
3755   def CheckPrereq(self):
3756     """Check prerequisites.
3757
3758     This checks that the instance is in the cluster.
3759
3760     """
3761     instance = self.cfg.GetInstanceInfo(
3762       self.cfg.ExpandInstanceName(self.op.instance_name))
3763     if instance is None:
3764       raise errors.OpPrereqError("Instance '%s' not known" %
3765                                  self.op.instance_name)
3766     self.instance = instance
3767
3768     if instance.disk_template != constants.DT_REMOTE_RAID1:
3769       raise errors.OpPrereqError("Instance's disk layout is not"
3770                                  " remote_raid1.")
3771     for disk in instance.disks:
3772       if disk.iv_name == self.op.disk_name:
3773         break
3774     else:
3775       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3776                                  " instance." % self.op.disk_name)
3777     for child in disk.children:
3778       if (child.dev_type == constants.LD_DRBD7 and
3779           child.logical_id[2] == self.op.disk_id):
3780         break
3781     else:
3782       raise errors.OpPrereqError("Can't find the device with this port.")
3783
3784     if len(disk.children) < 2:
3785       raise errors.OpPrereqError("Cannot remove the last component from"
3786                                  " a mirror.")
3787     self.disk = disk
3788     self.child = child
3789     if self.child.logical_id[0] == instance.primary_node:
3790       oid = 1
3791     else:
3792       oid = 0
3793     self.old_secondary = self.child.logical_id[oid]
3794
3795   def Exec(self, feedback_fn):
3796     """Remove the mirror component
3797
3798     """
3799     instance = self.instance
3800     disk = self.disk
3801     child = self.child
3802     logger.Info("remove mirror component")
3803     self.cfg.SetDiskID(disk, instance.primary_node)
3804     if not rpc.call_blockdev_removechildren(instance.primary_node,
3805                                             disk, [child]):
3806       raise errors.OpExecError("Can't remove child from mirror.")
3807
3808     for node in child.logical_id[:2]:
3809       self.cfg.SetDiskID(child, node)
3810       if not rpc.call_blockdev_remove(node, child):
3811         logger.Error("Warning: failed to remove device from node %s,"
3812                      " continuing operation." % node)
3813
3814     disk.children.remove(child)
3815     self.cfg.AddInstance(instance)
3816
3817
3818 class LUReplaceDisks(LogicalUnit):
3819   """Replace the disks of an instance.
3820
3821   """
3822   HPATH = "mirrors-replace"
3823   HTYPE = constants.HTYPE_INSTANCE
3824   _OP_REQP = ["instance_name", "mode", "disks"]
3825
3826   def _RunAllocator(self):
3827     """Compute a new secondary node using an IAllocator.
3828
3829     """
3830     ial = IAllocator(self.cfg, self.sstore,
3831                      mode=constants.IALLOCATOR_MODE_RELOC,
3832                      name=self.op.instance_name,
3833                      relocate_from=[self.sec_node])
3834
3835     ial.Run(self.op.iallocator)
3836
3837     if not ial.success:
3838       raise errors.OpPrereqError("Can't compute nodes using"
3839                                  " iallocator '%s': %s" % (self.op.iallocator,
3840                                                            ial.info))
3841     if len(ial.nodes) != ial.required_nodes:
3842       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3843                                  " of nodes (%s), required %s" %
3844                                  (len(ial.nodes), ial.required_nodes))
3845     self.op.remote_node = ial.nodes[0]
3846     logger.ToStdout("Selected new secondary for the instance: %s" %
3847                     self.op.remote_node)
3848
3849   def BuildHooksEnv(self):
3850     """Build hooks env.
3851
3852     This runs on the master, the primary and all the secondaries.
3853
3854     """
3855     env = {
3856       "MODE": self.op.mode,
3857       "NEW_SECONDARY": self.op.remote_node,
3858       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3859       }
3860     env.update(_BuildInstanceHookEnvByObject(self.instance))
3861     nl = [
3862       self.sstore.GetMasterNode(),
3863       self.instance.primary_node,
3864       ]
3865     if self.op.remote_node is not None:
3866       nl.append(self.op.remote_node)
3867     return env, nl, nl
3868
3869   def CheckPrereq(self):
3870     """Check prerequisites.
3871
3872     This checks that the instance is in the cluster.
3873
3874     """
3875     if not hasattr(self.op, "remote_node"):
3876       self.op.remote_node = None
3877
3878     instance = self.cfg.GetInstanceInfo(
3879       self.cfg.ExpandInstanceName(self.op.instance_name))
3880     if instance is None:
3881       raise errors.OpPrereqError("Instance '%s' not known" %
3882                                  self.op.instance_name)
3883     self.instance = instance
3884     self.op.instance_name = instance.name
3885
3886     if instance.disk_template not in constants.DTS_NET_MIRROR:
3887       raise errors.OpPrereqError("Instance's disk layout is not"
3888                                  " network mirrored.")
3889
3890     if len(instance.secondary_nodes) != 1:
3891       raise errors.OpPrereqError("The instance has a strange layout,"
3892                                  " expected one secondary but found %d" %
3893                                  len(instance.secondary_nodes))
3894
3895     self.sec_node = instance.secondary_nodes[0]
3896
3897     ia_name = getattr(self.op, "iallocator", None)
3898     if ia_name is not None:
3899       if self.op.remote_node is not None:
3900         raise errors.OpPrereqError("Give either the iallocator or the new"
3901                                    " secondary, not both")
3902       self.op.remote_node = self._RunAllocator()
3903
3904     remote_node = self.op.remote_node
3905     if remote_node is not None:
3906       remote_node = self.cfg.ExpandNodeName(remote_node)
3907       if remote_node is None:
3908         raise errors.OpPrereqError("Node '%s' not known" %
3909                                    self.op.remote_node)
3910       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3911     else:
3912       self.remote_node_info = None
3913     if remote_node == instance.primary_node:
3914       raise errors.OpPrereqError("The specified node is the primary node of"
3915                                  " the instance.")
3916     elif remote_node == self.sec_node:
3917       if self.op.mode == constants.REPLACE_DISK_SEC:
3918         # this is for DRBD8, where we can't execute the same mode of
3919         # replacement as for drbd7 (no different port allocated)
3920         raise errors.OpPrereqError("Same secondary given, cannot execute"
3921                                    " replacement")
3922       # the user gave the current secondary, switch to
3923       # 'no-replace-secondary' mode for drbd7
3924       remote_node = None
3925     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3926         self.op.mode != constants.REPLACE_DISK_ALL):
3927       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3928                                  " disks replacement, not individual ones")
3929     if instance.disk_template == constants.DT_DRBD8:
3930       if (self.op.mode == constants.REPLACE_DISK_ALL and
3931           remote_node is not None):
3932         # switch to replace secondary mode
3933         self.op.mode = constants.REPLACE_DISK_SEC
3934
3935       if self.op.mode == constants.REPLACE_DISK_ALL:
3936         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3937                                    " secondary disk replacement, not"
3938                                    " both at once")
3939       elif self.op.mode == constants.REPLACE_DISK_PRI:
3940         if remote_node is not None:
3941           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3942                                      " the secondary while doing a primary"
3943                                      " node disk replacement")
3944         self.tgt_node = instance.primary_node
3945         self.oth_node = instance.secondary_nodes[0]
3946       elif self.op.mode == constants.REPLACE_DISK_SEC:
3947         self.new_node = remote_node # this can be None, in which case
3948                                     # we don't change the secondary
3949         self.tgt_node = instance.secondary_nodes[0]
3950         self.oth_node = instance.primary_node
3951       else:
3952         raise errors.ProgrammerError("Unhandled disk replace mode")
3953
3954     for name in self.op.disks:
3955       if instance.FindDisk(name) is None:
3956         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3957                                    (name, instance.name))
3958     self.op.remote_node = remote_node
3959
3960   def _ExecRR1(self, feedback_fn):
3961     """Replace the disks of an instance.
3962
3963     """
3964     instance = self.instance
3965     iv_names = {}
3966     # start of work
3967     if self.op.remote_node is None:
3968       remote_node = self.sec_node
3969     else:
3970       remote_node = self.op.remote_node
3971     cfg = self.cfg
3972     for dev in instance.disks:
3973       size = dev.size
3974       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3975       names = _GenerateUniqueNames(cfg, lv_names)
3976       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3977                                        remote_node, size, names)
3978       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3979       logger.Info("adding new mirror component on secondary for %s" %
3980                   dev.iv_name)
3981       #HARDCODE
3982       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3983                                         new_drbd, False,
3984                                         _GetInstanceInfoText(instance)):
3985         raise errors.OpExecError("Failed to create new component on secondary"
3986                                  " node %s. Full abort, cleanup manually!" %
3987                                  remote_node)
3988
3989       logger.Info("adding new mirror component on primary")
3990       #HARDCODE
3991       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3992                                       instance, new_drbd,
3993                                       _GetInstanceInfoText(instance)):
3994         # remove secondary dev
3995         cfg.SetDiskID(new_drbd, remote_node)
3996         rpc.call_blockdev_remove(remote_node, new_drbd)
3997         raise errors.OpExecError("Failed to create volume on primary!"
3998                                  " Full abort, cleanup manually!!")
3999
4000       # the device exists now
4001       # call the primary node to add the mirror to md
4002       logger.Info("adding new mirror component to md")
4003       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4004                                            [new_drbd]):
4005         logger.Error("Can't add mirror compoment to md!")
4006         cfg.SetDiskID(new_drbd, remote_node)
4007         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4008           logger.Error("Can't rollback on secondary")
4009         cfg.SetDiskID(new_drbd, instance.primary_node)
4010         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4011           logger.Error("Can't rollback on primary")
4012         raise errors.OpExecError("Full abort, cleanup manually!!")
4013
4014       dev.children.append(new_drbd)
4015       cfg.AddInstance(instance)
4016
4017     # this can fail as the old devices are degraded and _WaitForSync
4018     # does a combined result over all disks, so we don't check its
4019     # return value
4020     _WaitForSync(cfg, instance, self.proc, unlock=True)
4021
4022     # so check manually all the devices
4023     for name in iv_names:
4024       dev, child, new_drbd = iv_names[name]
4025       cfg.SetDiskID(dev, instance.primary_node)
4026       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4027       if is_degr:
4028         raise errors.OpExecError("MD device %s is degraded!" % name)
4029       cfg.SetDiskID(new_drbd, instance.primary_node)
4030       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4031       if is_degr:
4032         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4033
4034     for name in iv_names:
4035       dev, child, new_drbd = iv_names[name]
4036       logger.Info("remove mirror %s component" % name)
4037       cfg.SetDiskID(dev, instance.primary_node)
4038       if not rpc.call_blockdev_removechildren(instance.primary_node,
4039                                               dev, [child]):
4040         logger.Error("Can't remove child from mirror, aborting"
4041                      " *this device cleanup*.\nYou need to cleanup manually!!")
4042         continue
4043
4044       for node in child.logical_id[:2]:
4045         logger.Info("remove child device on %s" % node)
4046         cfg.SetDiskID(child, node)
4047         if not rpc.call_blockdev_remove(node, child):
4048           logger.Error("Warning: failed to remove device from node %s,"
4049                        " continuing operation." % node)
4050
4051       dev.children.remove(child)
4052
4053       cfg.AddInstance(instance)
4054
4055   def _ExecD8DiskOnly(self, feedback_fn):
4056     """Replace a disk on the primary or secondary for dbrd8.
4057
4058     The algorithm for replace is quite complicated:
4059       - for each disk to be replaced:
4060         - create new LVs on the target node with unique names
4061         - detach old LVs from the drbd device
4062         - rename old LVs to name_replaced.<time_t>
4063         - rename new LVs to old LVs
4064         - attach the new LVs (with the old names now) to the drbd device
4065       - wait for sync across all devices
4066       - for each modified disk:
4067         - remove old LVs (which have the name name_replaces.<time_t>)
4068
4069     Failures are not very well handled.
4070
4071     """
4072     steps_total = 6
4073     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4074     instance = self.instance
4075     iv_names = {}
4076     vgname = self.cfg.GetVGName()
4077     # start of work
4078     cfg = self.cfg
4079     tgt_node = self.tgt_node
4080     oth_node = self.oth_node
4081
4082     # Step: check device activation
4083     self.proc.LogStep(1, steps_total, "check device existence")
4084     info("checking volume groups")
4085     my_vg = cfg.GetVGName()
4086     results = rpc.call_vg_list([oth_node, tgt_node])
4087     if not results:
4088       raise errors.OpExecError("Can't list volume groups on the nodes")
4089     for node in oth_node, tgt_node:
4090       res = results.get(node, False)
4091       if not res or my_vg not in res:
4092         raise errors.OpExecError("Volume group '%s' not found on %s" %
4093                                  (my_vg, node))
4094     for dev in instance.disks:
4095       if not dev.iv_name in self.op.disks:
4096         continue
4097       for node in tgt_node, oth_node:
4098         info("checking %s on %s" % (dev.iv_name, node))
4099         cfg.SetDiskID(dev, node)
4100         if not rpc.call_blockdev_find(node, dev):
4101           raise errors.OpExecError("Can't find device %s on node %s" %
4102                                    (dev.iv_name, node))
4103
4104     # Step: check other node consistency
4105     self.proc.LogStep(2, steps_total, "check peer consistency")
4106     for dev in instance.disks:
4107       if not dev.iv_name in self.op.disks:
4108         continue
4109       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4110       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4111                                    oth_node==instance.primary_node):
4112         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4113                                  " to replace disks on this node (%s)" %
4114                                  (oth_node, tgt_node))
4115
4116     # Step: create new storage
4117     self.proc.LogStep(3, steps_total, "allocate new storage")
4118     for dev in instance.disks:
4119       if not dev.iv_name in self.op.disks:
4120         continue
4121       size = dev.size
4122       cfg.SetDiskID(dev, tgt_node)
4123       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4124       names = _GenerateUniqueNames(cfg, lv_names)
4125       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4126                              logical_id=(vgname, names[0]))
4127       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4128                              logical_id=(vgname, names[1]))
4129       new_lvs = [lv_data, lv_meta]
4130       old_lvs = dev.children
4131       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4132       info("creating new local storage on %s for %s" %
4133            (tgt_node, dev.iv_name))
4134       # since we *always* want to create this LV, we use the
4135       # _Create...OnPrimary (which forces the creation), even if we
4136       # are talking about the secondary node
4137       for new_lv in new_lvs:
4138         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4139                                         _GetInstanceInfoText(instance)):
4140           raise errors.OpExecError("Failed to create new LV named '%s' on"
4141                                    " node '%s'" %
4142                                    (new_lv.logical_id[1], tgt_node))
4143
4144     # Step: for each lv, detach+rename*2+attach
4145     self.proc.LogStep(4, steps_total, "change drbd configuration")
4146     for dev, old_lvs, new_lvs in iv_names.itervalues():
4147       info("detaching %s drbd from local storage" % dev.iv_name)
4148       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4149         raise errors.OpExecError("Can't detach drbd from local storage on node"
4150                                  " %s for device %s" % (tgt_node, dev.iv_name))
4151       #dev.children = []
4152       #cfg.Update(instance)
4153
4154       # ok, we created the new LVs, so now we know we have the needed
4155       # storage; as such, we proceed on the target node to rename
4156       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4157       # using the assumption that logical_id == physical_id (which in
4158       # turn is the unique_id on that node)
4159
4160       # FIXME(iustin): use a better name for the replaced LVs
4161       temp_suffix = int(time.time())
4162       ren_fn = lambda d, suff: (d.physical_id[0],
4163                                 d.physical_id[1] + "_replaced-%s" % suff)
4164       # build the rename list based on what LVs exist on the node
4165       rlist = []
4166       for to_ren in old_lvs:
4167         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4168         if find_res is not None: # device exists
4169           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4170
4171       info("renaming the old LVs on the target node")
4172       if not rpc.call_blockdev_rename(tgt_node, rlist):
4173         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4174       # now we rename the new LVs to the old LVs
4175       info("renaming the new LVs on the target node")
4176       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4177       if not rpc.call_blockdev_rename(tgt_node, rlist):
4178         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4179
4180       for old, new in zip(old_lvs, new_lvs):
4181         new.logical_id = old.logical_id
4182         cfg.SetDiskID(new, tgt_node)
4183
4184       for disk in old_lvs:
4185         disk.logical_id = ren_fn(disk, temp_suffix)
4186         cfg.SetDiskID(disk, tgt_node)
4187
4188       # now that the new lvs have the old name, we can add them to the device
4189       info("adding new mirror component on %s" % tgt_node)
4190       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4191         for new_lv in new_lvs:
4192           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4193             warning("Can't rollback device %s", hint="manually cleanup unused"
4194                     " logical volumes")
4195         raise errors.OpExecError("Can't add local storage to drbd")
4196
4197       dev.children = new_lvs
4198       cfg.Update(instance)
4199
4200     # Step: wait for sync
4201
4202     # this can fail as the old devices are degraded and _WaitForSync
4203     # does a combined result over all disks, so we don't check its
4204     # return value
4205     self.proc.LogStep(5, steps_total, "sync devices")
4206     _WaitForSync(cfg, instance, self.proc, unlock=True)
4207
4208     # so check manually all the devices
4209     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4210       cfg.SetDiskID(dev, instance.primary_node)
4211       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4212       if is_degr:
4213         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4214
4215     # Step: remove old storage
4216     self.proc.LogStep(6, steps_total, "removing old storage")
4217     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4218       info("remove logical volumes for %s" % name)
4219       for lv in old_lvs:
4220         cfg.SetDiskID(lv, tgt_node)
4221         if not rpc.call_blockdev_remove(tgt_node, lv):
4222           warning("Can't remove old LV", hint="manually remove unused LVs")
4223           continue
4224
4225   def _ExecD8Secondary(self, feedback_fn):
4226     """Replace the secondary node for drbd8.
4227
4228     The algorithm for replace is quite complicated:
4229       - for all disks of the instance:
4230         - create new LVs on the new node with same names
4231         - shutdown the drbd device on the old secondary
4232         - disconnect the drbd network on the primary
4233         - create the drbd device on the new secondary
4234         - network attach the drbd on the primary, using an artifice:
4235           the drbd code for Attach() will connect to the network if it
4236           finds a device which is connected to the good local disks but
4237           not network enabled
4238       - wait for sync across all devices
4239       - remove all disks from the old secondary
4240
4241     Failures are not very well handled.
4242
4243     """
4244     steps_total = 6
4245     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4246     instance = self.instance
4247     iv_names = {}
4248     vgname = self.cfg.GetVGName()
4249     # start of work
4250     cfg = self.cfg
4251     old_node = self.tgt_node
4252     new_node = self.new_node
4253     pri_node = instance.primary_node
4254
4255     # Step: check device activation
4256     self.proc.LogStep(1, steps_total, "check device existence")
4257     info("checking volume groups")
4258     my_vg = cfg.GetVGName()
4259     results = rpc.call_vg_list([pri_node, new_node])
4260     if not results:
4261       raise errors.OpExecError("Can't list volume groups on the nodes")
4262     for node in pri_node, new_node:
4263       res = results.get(node, False)
4264       if not res or my_vg not in res:
4265         raise errors.OpExecError("Volume group '%s' not found on %s" %
4266                                  (my_vg, node))
4267     for dev in instance.disks:
4268       if not dev.iv_name in self.op.disks:
4269         continue
4270       info("checking %s on %s" % (dev.iv_name, pri_node))
4271       cfg.SetDiskID(dev, pri_node)
4272       if not rpc.call_blockdev_find(pri_node, dev):
4273         raise errors.OpExecError("Can't find device %s on node %s" %
4274                                  (dev.iv_name, pri_node))
4275
4276     # Step: check other node consistency
4277     self.proc.LogStep(2, steps_total, "check peer consistency")
4278     for dev in instance.disks:
4279       if not dev.iv_name in self.op.disks:
4280         continue
4281       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4282       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4283         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4284                                  " unsafe to replace the secondary" %
4285                                  pri_node)
4286
4287     # Step: create new storage
4288     self.proc.LogStep(3, steps_total, "allocate new storage")
4289     for dev in instance.disks:
4290       size = dev.size
4291       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4292       # since we *always* want to create this LV, we use the
4293       # _Create...OnPrimary (which forces the creation), even if we
4294       # are talking about the secondary node
4295       for new_lv in dev.children:
4296         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4297                                         _GetInstanceInfoText(instance)):
4298           raise errors.OpExecError("Failed to create new LV named '%s' on"
4299                                    " node '%s'" %
4300                                    (new_lv.logical_id[1], new_node))
4301
4302       iv_names[dev.iv_name] = (dev, dev.children)
4303
4304     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4305     for dev in instance.disks:
4306       size = dev.size
4307       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4308       # create new devices on new_node
4309       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4310                               logical_id=(pri_node, new_node,
4311                                           dev.logical_id[2]),
4312                               children=dev.children)
4313       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4314                                         new_drbd, False,
4315                                       _GetInstanceInfoText(instance)):
4316         raise errors.OpExecError("Failed to create new DRBD on"
4317                                  " node '%s'" % new_node)
4318
4319     for dev in instance.disks:
4320       # we have new devices, shutdown the drbd on the old secondary
4321       info("shutting down drbd for %s on old node" % dev.iv_name)
4322       cfg.SetDiskID(dev, old_node)
4323       if not rpc.call_blockdev_shutdown(old_node, dev):
4324         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4325                 hint="Please cleanup this device manually as soon as possible")
4326
4327     info("detaching primary drbds from the network (=> standalone)")
4328     done = 0
4329     for dev in instance.disks:
4330       cfg.SetDiskID(dev, pri_node)
4331       # set the physical (unique in bdev terms) id to None, meaning
4332       # detach from network
4333       dev.physical_id = (None,) * len(dev.physical_id)
4334       # and 'find' the device, which will 'fix' it to match the
4335       # standalone state
4336       if rpc.call_blockdev_find(pri_node, dev):
4337         done += 1
4338       else:
4339         warning("Failed to detach drbd %s from network, unusual case" %
4340                 dev.iv_name)
4341
4342     if not done:
4343       # no detaches succeeded (very unlikely)
4344       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4345
4346     # if we managed to detach at least one, we update all the disks of
4347     # the instance to point to the new secondary
4348     info("updating instance configuration")
4349     for dev in instance.disks:
4350       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4351       cfg.SetDiskID(dev, pri_node)
4352     cfg.Update(instance)
4353
4354     # and now perform the drbd attach
4355     info("attaching primary drbds to new secondary (standalone => connected)")
4356     failures = []
4357     for dev in instance.disks:
4358       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4359       # since the attach is smart, it's enough to 'find' the device,
4360       # it will automatically activate the network, if the physical_id
4361       # is correct
4362       cfg.SetDiskID(dev, pri_node)
4363       if not rpc.call_blockdev_find(pri_node, dev):
4364         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4365                 "please do a gnt-instance info to see the status of disks")
4366
4367     # this can fail as the old devices are degraded and _WaitForSync
4368     # does a combined result over all disks, so we don't check its
4369     # return value
4370     self.proc.LogStep(5, steps_total, "sync devices")
4371     _WaitForSync(cfg, instance, self.proc, unlock=True)
4372
4373     # so check manually all the devices
4374     for name, (dev, old_lvs) in iv_names.iteritems():
4375       cfg.SetDiskID(dev, pri_node)
4376       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4377       if is_degr:
4378         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4379
4380     self.proc.LogStep(6, steps_total, "removing old storage")
4381     for name, (dev, old_lvs) in iv_names.iteritems():
4382       info("remove logical volumes for %s" % name)
4383       for lv in old_lvs:
4384         cfg.SetDiskID(lv, old_node)
4385         if not rpc.call_blockdev_remove(old_node, lv):
4386           warning("Can't remove LV on old secondary",
4387                   hint="Cleanup stale volumes by hand")
4388
4389   def Exec(self, feedback_fn):
4390     """Execute disk replacement.
4391
4392     This dispatches the disk replacement to the appropriate handler.
4393
4394     """
4395     instance = self.instance
4396
4397     # Activate the instance disks if we're replacing them on a down instance
4398     if instance.status == "down":
4399       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4400       self.proc.ChainOpCode(op)
4401
4402     if instance.disk_template == constants.DT_REMOTE_RAID1:
4403       fn = self._ExecRR1
4404     elif instance.disk_template == constants.DT_DRBD8:
4405       if self.op.remote_node is None:
4406         fn = self._ExecD8DiskOnly
4407       else:
4408         fn = self._ExecD8Secondary
4409     else:
4410       raise errors.ProgrammerError("Unhandled disk replacement case")
4411
4412     ret = fn(feedback_fn)
4413
4414     # Deactivate the instance disks if we're replacing them on a down instance
4415     if instance.status == "down":
4416       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4417       self.proc.ChainOpCode(op)
4418
4419     return ret
4420
4421
4422 class LUGrowDisk(LogicalUnit):
4423   """Grow a disk of an instance.
4424
4425   """
4426   HPATH = "disk-grow"
4427   HTYPE = constants.HTYPE_INSTANCE
4428   _OP_REQP = ["instance_name", "disk", "amount"]
4429
4430   def BuildHooksEnv(self):
4431     """Build hooks env.
4432
4433     This runs on the master, the primary and all the secondaries.
4434
4435     """
4436     env = {
4437       "DISK": self.op.disk,
4438       "AMOUNT": self.op.amount,
4439       }
4440     env.update(_BuildInstanceHookEnvByObject(self.instance))
4441     nl = [
4442       self.sstore.GetMasterNode(),
4443       self.instance.primary_node,
4444       ]
4445     return env, nl, nl
4446
4447   def CheckPrereq(self):
4448     """Check prerequisites.
4449
4450     This checks that the instance is in the cluster.
4451
4452     """
4453     instance = self.cfg.GetInstanceInfo(
4454       self.cfg.ExpandInstanceName(self.op.instance_name))
4455     if instance is None:
4456       raise errors.OpPrereqError("Instance '%s' not known" %
4457                                  self.op.instance_name)
4458     self.instance = instance
4459     self.op.instance_name = instance.name
4460
4461     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4462       raise errors.OpPrereqError("Instance's disk layout does not support"
4463                                  " growing.")
4464
4465     if instance.FindDisk(self.op.disk) is None:
4466       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4467                                  (self.op.disk, instance.name))
4468
4469     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4470     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4471     for node in nodenames:
4472       info = nodeinfo.get(node, None)
4473       if not info:
4474         raise errors.OpPrereqError("Cannot get current information"
4475                                    " from node '%s'" % node)
4476       vg_free = info.get('vg_free', None)
4477       if not isinstance(vg_free, int):
4478         raise errors.OpPrereqError("Can't compute free disk space on"
4479                                    " node %s" % node)
4480       if self.op.amount > info['vg_free']:
4481         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4482                                    " %d MiB available, %d MiB required" %
4483                                    (node, info['vg_free'], self.op.amount))
4484
4485   def Exec(self, feedback_fn):
4486     """Execute disk grow.
4487
4488     """
4489     instance = self.instance
4490     disk = instance.FindDisk(self.op.disk)
4491     for node in (instance.secondary_nodes + (instance.primary_node,)):
4492       self.cfg.SetDiskID(disk, node)
4493       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4494       if not result or not isinstance(result, tuple) or len(result) != 2:
4495         raise errors.OpExecError("grow request failed to node %s" % node)
4496       elif not result[0]:
4497         raise errors.OpExecError("grow request failed to node %s: %s" %
4498                                  (node, result[1]))
4499     disk.RecordGrow(self.op.amount)
4500     self.cfg.Update(instance)
4501     return
4502
4503
4504 class LUQueryInstanceData(NoHooksLU):
4505   """Query runtime instance data.
4506
4507   """
4508   _OP_REQP = ["instances"]
4509
4510   def CheckPrereq(self):
4511     """Check prerequisites.
4512
4513     This only checks the optional instance list against the existing names.
4514
4515     """
4516     if not isinstance(self.op.instances, list):
4517       raise errors.OpPrereqError("Invalid argument type 'instances'")
4518     if self.op.instances:
4519       self.wanted_instances = []
4520       names = self.op.instances
4521       for name in names:
4522         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4523         if instance is None:
4524           raise errors.OpPrereqError("No such instance name '%s'" % name)
4525         self.wanted_instances.append(instance)
4526     else:
4527       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4528                                in self.cfg.GetInstanceList()]
4529     return
4530
4531
4532   def _ComputeDiskStatus(self, instance, snode, dev):
4533     """Compute block device status.
4534
4535     """
4536     self.cfg.SetDiskID(dev, instance.primary_node)
4537     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4538     if dev.dev_type in constants.LDS_DRBD:
4539       # we change the snode then (otherwise we use the one passed in)
4540       if dev.logical_id[0] == instance.primary_node:
4541         snode = dev.logical_id[1]
4542       else:
4543         snode = dev.logical_id[0]
4544
4545     if snode:
4546       self.cfg.SetDiskID(dev, snode)
4547       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4548     else:
4549       dev_sstatus = None
4550
4551     if dev.children:
4552       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4553                       for child in dev.children]
4554     else:
4555       dev_children = []
4556
4557     data = {
4558       "iv_name": dev.iv_name,
4559       "dev_type": dev.dev_type,
4560       "logical_id": dev.logical_id,
4561       "physical_id": dev.physical_id,
4562       "pstatus": dev_pstatus,
4563       "sstatus": dev_sstatus,
4564       "children": dev_children,
4565       }
4566
4567     return data
4568
4569   def Exec(self, feedback_fn):
4570     """Gather and return data"""
4571     result = {}
4572     for instance in self.wanted_instances:
4573       remote_info = rpc.call_instance_info(instance.primary_node,
4574                                                 instance.name)
4575       if remote_info and "state" in remote_info:
4576         remote_state = "up"
4577       else:
4578         remote_state = "down"
4579       if instance.status == "down":
4580         config_state = "down"
4581       else:
4582         config_state = "up"
4583
4584       disks = [self._ComputeDiskStatus(instance, None, device)
4585                for device in instance.disks]
4586
4587       idict = {
4588         "name": instance.name,
4589         "config_state": config_state,
4590         "run_state": remote_state,
4591         "pnode": instance.primary_node,
4592         "snodes": instance.secondary_nodes,
4593         "os": instance.os,
4594         "memory": instance.memory,
4595         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4596         "disks": disks,
4597         "vcpus": instance.vcpus,
4598         }
4599
4600       htkind = self.sstore.GetHypervisorType()
4601       if htkind == constants.HT_XEN_PVM30:
4602         idict["kernel_path"] = instance.kernel_path
4603         idict["initrd_path"] = instance.initrd_path
4604
4605       if htkind == constants.HT_XEN_HVM31:
4606         idict["hvm_boot_order"] = instance.hvm_boot_order
4607         idict["hvm_acpi"] = instance.hvm_acpi
4608         idict["hvm_pae"] = instance.hvm_pae
4609         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4610
4611       if htkind in constants.HTS_REQ_PORT:
4612         idict["vnc_bind_address"] = instance.vnc_bind_address
4613         idict["network_port"] = instance.network_port
4614
4615       result[instance.name] = idict
4616
4617     return result
4618
4619
4620 class LUSetInstanceParms(LogicalUnit):
4621   """Modifies an instances's parameters.
4622
4623   """
4624   HPATH = "instance-modify"
4625   HTYPE = constants.HTYPE_INSTANCE
4626   _OP_REQP = ["instance_name"]
4627
4628   def BuildHooksEnv(self):
4629     """Build hooks env.
4630
4631     This runs on the master, primary and secondaries.
4632
4633     """
4634     args = dict()
4635     if self.mem:
4636       args['memory'] = self.mem
4637     if self.vcpus:
4638       args['vcpus'] = self.vcpus
4639     if self.do_ip or self.do_bridge or self.mac:
4640       if self.do_ip:
4641         ip = self.ip
4642       else:
4643         ip = self.instance.nics[0].ip
4644       if self.bridge:
4645         bridge = self.bridge
4646       else:
4647         bridge = self.instance.nics[0].bridge
4648       if self.mac:
4649         mac = self.mac
4650       else:
4651         mac = self.instance.nics[0].mac
4652       args['nics'] = [(ip, bridge, mac)]
4653     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4654     nl = [self.sstore.GetMasterNode(),
4655           self.instance.primary_node] + list(self.instance.secondary_nodes)
4656     return env, nl, nl
4657
4658   def CheckPrereq(self):
4659     """Check prerequisites.
4660
4661     This only checks the instance list against the existing names.
4662
4663     """
4664     self.mem = getattr(self.op, "mem", None)
4665     self.vcpus = getattr(self.op, "vcpus", None)
4666     self.ip = getattr(self.op, "ip", None)
4667     self.mac = getattr(self.op, "mac", None)
4668     self.bridge = getattr(self.op, "bridge", None)
4669     self.kernel_path = getattr(self.op, "kernel_path", None)
4670     self.initrd_path = getattr(self.op, "initrd_path", None)
4671     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4672     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4673     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4674     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4675     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4676     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4677                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4678                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4679                  self.vnc_bind_address]
4680     if all_parms.count(None) == len(all_parms):
4681       raise errors.OpPrereqError("No changes submitted")
4682     if self.mem is not None:
4683       try:
4684         self.mem = int(self.mem)
4685       except ValueError, err:
4686         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4687     if self.vcpus is not None:
4688       try:
4689         self.vcpus = int(self.vcpus)
4690       except ValueError, err:
4691         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4692     if self.ip is not None:
4693       self.do_ip = True
4694       if self.ip.lower() == "none":
4695         self.ip = None
4696       else:
4697         if not utils.IsValidIP(self.ip):
4698           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4699     else:
4700       self.do_ip = False
4701     self.do_bridge = (self.bridge is not None)
4702     if self.mac is not None:
4703       if self.cfg.IsMacInUse(self.mac):
4704         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4705                                    self.mac)
4706       if not utils.IsValidMac(self.mac):
4707         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4708
4709     if self.kernel_path is not None:
4710       self.do_kernel_path = True
4711       if self.kernel_path == constants.VALUE_NONE:
4712         raise errors.OpPrereqError("Can't set instance to no kernel")
4713
4714       if self.kernel_path != constants.VALUE_DEFAULT:
4715         if not os.path.isabs(self.kernel_path):
4716           raise errors.OpPrereqError("The kernel path must be an absolute"
4717                                     " filename")
4718     else:
4719       self.do_kernel_path = False
4720
4721     if self.initrd_path is not None:
4722       self.do_initrd_path = True
4723       if self.initrd_path not in (constants.VALUE_NONE,
4724                                   constants.VALUE_DEFAULT):
4725         if not os.path.isabs(self.initrd_path):
4726           raise errors.OpPrereqError("The initrd path must be an absolute"
4727                                     " filename")
4728     else:
4729       self.do_initrd_path = False
4730
4731     # boot order verification
4732     if self.hvm_boot_order is not None:
4733       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4734         if len(self.hvm_boot_order.strip("acdn")) != 0:
4735           raise errors.OpPrereqError("invalid boot order specified,"
4736                                      " must be one or more of [acdn]"
4737                                      " or 'default'")
4738
4739     # hvm_cdrom_image_path verification
4740     if self.op.hvm_cdrom_image_path is not None:
4741       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4742               self.op.hvm_cdrom_image_path.lower() == "none"):
4743         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4744                                    " be an absolute path or None, not %s" %
4745                                    self.op.hvm_cdrom_image_path)
4746       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4747               self.op.hvm_cdrom_image_path.lower() == "none"):
4748         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4749                                    " regular file or a symlink pointing to"
4750                                    " an existing regular file, not %s" %
4751                                    self.op.hvm_cdrom_image_path)
4752
4753     # vnc_bind_address verification
4754     if self.op.vnc_bind_address is not None:
4755       if not utils.IsValidIP(self.op.vnc_bind_address):
4756         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4757                                    " like a valid IP address" %
4758                                    self.op.vnc_bind_address)
4759
4760     instance = self.cfg.GetInstanceInfo(
4761       self.cfg.ExpandInstanceName(self.op.instance_name))
4762     if instance is None:
4763       raise errors.OpPrereqError("No such instance name '%s'" %
4764                                  self.op.instance_name)
4765     self.op.instance_name = instance.name
4766     self.instance = instance
4767     return
4768
4769   def Exec(self, feedback_fn):
4770     """Modifies an instance.
4771
4772     All parameters take effect only at the next restart of the instance.
4773     """
4774     result = []
4775     instance = self.instance
4776     if self.mem:
4777       instance.memory = self.mem
4778       result.append(("mem", self.mem))
4779     if self.vcpus:
4780       instance.vcpus = self.vcpus
4781       result.append(("vcpus",  self.vcpus))
4782     if self.do_ip:
4783       instance.nics[0].ip = self.ip
4784       result.append(("ip", self.ip))
4785     if self.bridge:
4786       instance.nics[0].bridge = self.bridge
4787       result.append(("bridge", self.bridge))
4788     if self.mac:
4789       instance.nics[0].mac = self.mac
4790       result.append(("mac", self.mac))
4791     if self.do_kernel_path:
4792       instance.kernel_path = self.kernel_path
4793       result.append(("kernel_path", self.kernel_path))
4794     if self.do_initrd_path:
4795       instance.initrd_path = self.initrd_path
4796       result.append(("initrd_path", self.initrd_path))
4797     if self.hvm_boot_order:
4798       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4799         instance.hvm_boot_order = None
4800       else:
4801         instance.hvm_boot_order = self.hvm_boot_order
4802       result.append(("hvm_boot_order", self.hvm_boot_order))
4803     if self.hvm_acpi is not None:
4804       instance.hvm_acpi = self.hvm_acpi
4805       result.append(("hvm_acpi", self.hvm_acpi))
4806     if self.hvm_pae is not None:
4807       instance.hvm_pae = self.hvm_pae
4808       result.append(("hvm_pae", self.hvm_pae))
4809     if self.hvm_cdrom_image_path:
4810       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4811         instance.hvm_cdrom_image_path = None
4812       else:
4813         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4814       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4815     if self.vnc_bind_address:
4816       instance.vnc_bind_address = self.vnc_bind_address
4817       result.append(("vnc_bind_address", self.vnc_bind_address))
4818
4819     self.cfg.AddInstance(instance)
4820
4821     return result
4822
4823
4824 class LUQueryExports(NoHooksLU):
4825   """Query the exports list
4826
4827   """
4828   _OP_REQP = []
4829
4830   def CheckPrereq(self):
4831     """Check that the nodelist contains only existing nodes.
4832
4833     """
4834     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4835
4836   def Exec(self, feedback_fn):
4837     """Compute the list of all the exported system images.
4838
4839     Returns:
4840       a dictionary with the structure node->(export-list)
4841       where export-list is a list of the instances exported on
4842       that node.
4843
4844     """
4845     return rpc.call_export_list(self.nodes)
4846
4847
4848 class LUExportInstance(LogicalUnit):
4849   """Export an instance to an image in the cluster.
4850
4851   """
4852   HPATH = "instance-export"
4853   HTYPE = constants.HTYPE_INSTANCE
4854   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4855
4856   def BuildHooksEnv(self):
4857     """Build hooks env.
4858
4859     This will run on the master, primary node and target node.
4860
4861     """
4862     env = {
4863       "EXPORT_NODE": self.op.target_node,
4864       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4865       }
4866     env.update(_BuildInstanceHookEnvByObject(self.instance))
4867     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4868           self.op.target_node]
4869     return env, nl, nl
4870
4871   def CheckPrereq(self):
4872     """Check prerequisites.
4873
4874     This checks that the instance and node names are valid.
4875
4876     """
4877     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4878     self.instance = self.cfg.GetInstanceInfo(instance_name)
4879     if self.instance is None:
4880       raise errors.OpPrereqError("Instance '%s' not found" %
4881                                  self.op.instance_name)
4882
4883     # node verification
4884     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4885     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4886
4887     if self.dst_node is None:
4888       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4889                                  self.op.target_node)
4890     self.op.target_node = self.dst_node.name
4891
4892   def Exec(self, feedback_fn):
4893     """Export an instance to an image in the cluster.
4894
4895     """
4896     instance = self.instance
4897     dst_node = self.dst_node
4898     src_node = instance.primary_node
4899     if self.op.shutdown:
4900       # shutdown the instance, but not the disks
4901       if not rpc.call_instance_shutdown(src_node, instance):
4902         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4903                                  (instance.name, src_node))
4904
4905     vgname = self.cfg.GetVGName()
4906
4907     snap_disks = []
4908
4909     try:
4910       for disk in instance.disks:
4911         if disk.iv_name == "sda":
4912           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4913           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4914
4915           if not new_dev_name:
4916             logger.Error("could not snapshot block device %s on node %s" %
4917                          (disk.logical_id[1], src_node))
4918           else:
4919             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4920                                       logical_id=(vgname, new_dev_name),
4921                                       physical_id=(vgname, new_dev_name),
4922                                       iv_name=disk.iv_name)
4923             snap_disks.append(new_dev)
4924
4925     finally:
4926       if self.op.shutdown and instance.status == "up":
4927         if not rpc.call_instance_start(src_node, instance, None):
4928           _ShutdownInstanceDisks(instance, self.cfg)
4929           raise errors.OpExecError("Could not start instance")
4930
4931     # TODO: check for size
4932
4933     for dev in snap_disks:
4934       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4935                                            instance):
4936         logger.Error("could not export block device %s from node"
4937                      " %s to node %s" %
4938                      (dev.logical_id[1], src_node, dst_node.name))
4939       if not rpc.call_blockdev_remove(src_node, dev):
4940         logger.Error("could not remove snapshot block device %s from"
4941                      " node %s" % (dev.logical_id[1], src_node))
4942
4943     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4944       logger.Error("could not finalize export for instance %s on node %s" %
4945                    (instance.name, dst_node.name))
4946
4947     nodelist = self.cfg.GetNodeList()
4948     nodelist.remove(dst_node.name)
4949
4950     # on one-node clusters nodelist will be empty after the removal
4951     # if we proceed the backup would be removed because OpQueryExports
4952     # substitutes an empty list with the full cluster node list.
4953     if nodelist:
4954       op = opcodes.OpQueryExports(nodes=nodelist)
4955       exportlist = self.proc.ChainOpCode(op)
4956       for node in exportlist:
4957         if instance.name in exportlist[node]:
4958           if not rpc.call_export_remove(node, instance.name):
4959             logger.Error("could not remove older export for instance %s"
4960                          " on node %s" % (instance.name, node))
4961
4962
4963 class LURemoveExport(NoHooksLU):
4964   """Remove exports related to the named instance.
4965
4966   """
4967   _OP_REQP = ["instance_name"]
4968
4969   def CheckPrereq(self):
4970     """Check prerequisites.
4971     """
4972     pass
4973
4974   def Exec(self, feedback_fn):
4975     """Remove any export.
4976
4977     """
4978     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4979     # If the instance was not found we'll try with the name that was passed in.
4980     # This will only work if it was an FQDN, though.
4981     fqdn_warn = False
4982     if not instance_name:
4983       fqdn_warn = True
4984       instance_name = self.op.instance_name
4985
4986     op = opcodes.OpQueryExports(nodes=[])
4987     exportlist = self.proc.ChainOpCode(op)
4988     found = False
4989     for node in exportlist:
4990       if instance_name in exportlist[node]:
4991         found = True
4992         if not rpc.call_export_remove(node, instance_name):
4993           logger.Error("could not remove export for instance %s"
4994                        " on node %s" % (instance_name, node))
4995
4996     if fqdn_warn and not found:
4997       feedback_fn("Export not found. If trying to remove an export belonging"
4998                   " to a deleted instance please use its Fully Qualified"
4999                   " Domain Name.")
5000
5001
5002 class TagsLU(NoHooksLU):
5003   """Generic tags LU.
5004
5005   This is an abstract class which is the parent of all the other tags LUs.
5006
5007   """
5008   def CheckPrereq(self):
5009     """Check prerequisites.
5010
5011     """
5012     if self.op.kind == constants.TAG_CLUSTER:
5013       self.target = self.cfg.GetClusterInfo()
5014     elif self.op.kind == constants.TAG_NODE:
5015       name = self.cfg.ExpandNodeName(self.op.name)
5016       if name is None:
5017         raise errors.OpPrereqError("Invalid node name (%s)" %
5018                                    (self.op.name,))
5019       self.op.name = name
5020       self.target = self.cfg.GetNodeInfo(name)
5021     elif self.op.kind == constants.TAG_INSTANCE:
5022       name = self.cfg.ExpandInstanceName(self.op.name)
5023       if name is None:
5024         raise errors.OpPrereqError("Invalid instance name (%s)" %
5025                                    (self.op.name,))
5026       self.op.name = name
5027       self.target = self.cfg.GetInstanceInfo(name)
5028     else:
5029       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5030                                  str(self.op.kind))
5031
5032
5033 class LUGetTags(TagsLU):
5034   """Returns the tags of a given object.
5035
5036   """
5037   _OP_REQP = ["kind", "name"]
5038
5039   def Exec(self, feedback_fn):
5040     """Returns the tag list.
5041
5042     """
5043     return self.target.GetTags()
5044
5045
5046 class LUSearchTags(NoHooksLU):
5047   """Searches the tags for a given pattern.
5048
5049   """
5050   _OP_REQP = ["pattern"]
5051
5052   def CheckPrereq(self):
5053     """Check prerequisites.
5054
5055     This checks the pattern passed for validity by compiling it.
5056
5057     """
5058     try:
5059       self.re = re.compile(self.op.pattern)
5060     except re.error, err:
5061       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5062                                  (self.op.pattern, err))
5063
5064   def Exec(self, feedback_fn):
5065     """Returns the tag list.
5066
5067     """
5068     cfg = self.cfg
5069     tgts = [("/cluster", cfg.GetClusterInfo())]
5070     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5071     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5072     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5073     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5074     results = []
5075     for path, target in tgts:
5076       for tag in target.GetTags():
5077         if self.re.search(tag):
5078           results.append((path, tag))
5079     return results
5080
5081
5082 class LUAddTags(TagsLU):
5083   """Sets a tag on a given object.
5084
5085   """
5086   _OP_REQP = ["kind", "name", "tags"]
5087
5088   def CheckPrereq(self):
5089     """Check prerequisites.
5090
5091     This checks the type and length of the tag name and value.
5092
5093     """
5094     TagsLU.CheckPrereq(self)
5095     for tag in self.op.tags:
5096       objects.TaggableObject.ValidateTag(tag)
5097
5098   def Exec(self, feedback_fn):
5099     """Sets the tag.
5100
5101     """
5102     try:
5103       for tag in self.op.tags:
5104         self.target.AddTag(tag)
5105     except errors.TagError, err:
5106       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5107     try:
5108       self.cfg.Update(self.target)
5109     except errors.ConfigurationError:
5110       raise errors.OpRetryError("There has been a modification to the"
5111                                 " config file and the operation has been"
5112                                 " aborted. Please retry.")
5113
5114
5115 class LUDelTags(TagsLU):
5116   """Delete a list of tags from a given object.
5117
5118   """
5119   _OP_REQP = ["kind", "name", "tags"]
5120
5121   def CheckPrereq(self):
5122     """Check prerequisites.
5123
5124     This checks that we have the given tag.
5125
5126     """
5127     TagsLU.CheckPrereq(self)
5128     for tag in self.op.tags:
5129       objects.TaggableObject.ValidateTag(tag)
5130     del_tags = frozenset(self.op.tags)
5131     cur_tags = self.target.GetTags()
5132     if not del_tags <= cur_tags:
5133       diff_tags = del_tags - cur_tags
5134       diff_names = ["'%s'" % tag for tag in diff_tags]
5135       diff_names.sort()
5136       raise errors.OpPrereqError("Tag(s) %s not found" %
5137                                  (",".join(diff_names)))
5138
5139   def Exec(self, feedback_fn):
5140     """Remove the tag from the object.
5141
5142     """
5143     for tag in self.op.tags:
5144       self.target.RemoveTag(tag)
5145     try:
5146       self.cfg.Update(self.target)
5147     except errors.ConfigurationError:
5148       raise errors.OpRetryError("There has been a modification to the"
5149                                 " config file and the operation has been"
5150                                 " aborted. Please retry.")
5151
5152 class LUTestDelay(NoHooksLU):
5153   """Sleep for a specified amount of time.
5154
5155   This LU sleeps on the master and/or nodes for a specified amoutn of
5156   time.
5157
5158   """
5159   _OP_REQP = ["duration", "on_master", "on_nodes"]
5160
5161   def CheckPrereq(self):
5162     """Check prerequisites.
5163
5164     This checks that we have a good list of nodes and/or the duration
5165     is valid.
5166
5167     """
5168
5169     if self.op.on_nodes:
5170       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5171
5172   def Exec(self, feedback_fn):
5173     """Do the actual sleep.
5174
5175     """
5176     if self.op.on_master:
5177       if not utils.TestDelay(self.op.duration):
5178         raise errors.OpExecError("Error during master delay test")
5179     if self.op.on_nodes:
5180       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5181       if not result:
5182         raise errors.OpExecError("Complete failure from rpc call")
5183       for node, node_result in result.items():
5184         if not node_result:
5185           raise errors.OpExecError("Failure during rpc call to node %s,"
5186                                    " result: %s" % (node, node_result))
5187
5188
5189 class IAllocator(object):
5190   """IAllocator framework.
5191
5192   An IAllocator instance has three sets of attributes:
5193     - cfg/sstore that are needed to query the cluster
5194     - input data (all members of the _KEYS class attribute are required)
5195     - four buffer attributes (in|out_data|text), that represent the
5196       input (to the external script) in text and data structure format,
5197       and the output from it, again in two formats
5198     - the result variables from the script (success, info, nodes) for
5199       easy usage
5200
5201   """
5202   _ALLO_KEYS = [
5203     "mem_size", "disks", "disk_template",
5204     "os", "tags", "nics", "vcpus",
5205     ]
5206   _RELO_KEYS = [
5207     "relocate_from",
5208     ]
5209
5210   def __init__(self, cfg, sstore, mode, name, **kwargs):
5211     self.cfg = cfg
5212     self.sstore = sstore
5213     # init buffer variables
5214     self.in_text = self.out_text = self.in_data = self.out_data = None
5215     # init all input fields so that pylint is happy
5216     self.mode = mode
5217     self.name = name
5218     self.mem_size = self.disks = self.disk_template = None
5219     self.os = self.tags = self.nics = self.vcpus = None
5220     self.relocate_from = None
5221     # computed fields
5222     self.required_nodes = None
5223     # init result fields
5224     self.success = self.info = self.nodes = None
5225     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5226       keyset = self._ALLO_KEYS
5227     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5228       keyset = self._RELO_KEYS
5229     else:
5230       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5231                                    " IAllocator" % self.mode)
5232     for key in kwargs:
5233       if key not in keyset:
5234         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5235                                      " IAllocator" % key)
5236       setattr(self, key, kwargs[key])
5237     for key in keyset:
5238       if key not in kwargs:
5239         raise errors.ProgrammerError("Missing input parameter '%s' to"
5240                                      " IAllocator" % key)
5241     self._BuildInputData()
5242
5243   def _ComputeClusterData(self):
5244     """Compute the generic allocator input data.
5245
5246     This is the data that is independent of the actual operation.
5247
5248     """
5249     cfg = self.cfg
5250     # cluster data
5251     data = {
5252       "version": 1,
5253       "cluster_name": self.sstore.GetClusterName(),
5254       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5255       "hypervisor_type": self.sstore.GetHypervisorType(),
5256       # we don't have job IDs
5257       }
5258
5259     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5260
5261     # node data
5262     node_results = {}
5263     node_list = cfg.GetNodeList()
5264     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5265     for nname in node_list:
5266       ninfo = cfg.GetNodeInfo(nname)
5267       if nname not in node_data or not isinstance(node_data[nname], dict):
5268         raise errors.OpExecError("Can't get data for node %s" % nname)
5269       remote_info = node_data[nname]
5270       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5271                    'vg_size', 'vg_free', 'cpu_total']:
5272         if attr not in remote_info:
5273           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5274                                    (nname, attr))
5275         try:
5276           remote_info[attr] = int(remote_info[attr])
5277         except ValueError, err:
5278           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5279                                    " %s" % (nname, attr, str(err)))
5280       # compute memory used by primary instances
5281       i_p_mem = i_p_up_mem = 0
5282       for iinfo in i_list:
5283         if iinfo.primary_node == nname:
5284           i_p_mem += iinfo.memory
5285           if iinfo.status == "up":
5286             i_p_up_mem += iinfo.memory
5287
5288       # compute memory used by instances
5289       pnr = {
5290         "tags": list(ninfo.GetTags()),
5291         "total_memory": remote_info['memory_total'],
5292         "reserved_memory": remote_info['memory_dom0'],
5293         "free_memory": remote_info['memory_free'],
5294         "i_pri_memory": i_p_mem,
5295         "i_pri_up_memory": i_p_up_mem,
5296         "total_disk": remote_info['vg_size'],
5297         "free_disk": remote_info['vg_free'],
5298         "primary_ip": ninfo.primary_ip,
5299         "secondary_ip": ninfo.secondary_ip,
5300         "total_cpus": remote_info['cpu_total'],
5301         }
5302       node_results[nname] = pnr
5303     data["nodes"] = node_results
5304
5305     # instance data
5306     instance_data = {}
5307     for iinfo in i_list:
5308       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5309                   for n in iinfo.nics]
5310       pir = {
5311         "tags": list(iinfo.GetTags()),
5312         "should_run": iinfo.status == "up",
5313         "vcpus": iinfo.vcpus,
5314         "memory": iinfo.memory,
5315         "os": iinfo.os,
5316         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5317         "nics": nic_data,
5318         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5319         "disk_template": iinfo.disk_template,
5320         }
5321       instance_data[iinfo.name] = pir
5322
5323     data["instances"] = instance_data
5324
5325     self.in_data = data
5326
5327   def _AddNewInstance(self):
5328     """Add new instance data to allocator structure.
5329
5330     This in combination with _AllocatorGetClusterData will create the
5331     correct structure needed as input for the allocator.
5332
5333     The checks for the completeness of the opcode must have already been
5334     done.
5335
5336     """
5337     data = self.in_data
5338     if len(self.disks) != 2:
5339       raise errors.OpExecError("Only two-disk configurations supported")
5340
5341     disk_space = _ComputeDiskSize(self.disk_template,
5342                                   self.disks[0]["size"], self.disks[1]["size"])
5343
5344     if self.disk_template in constants.DTS_NET_MIRROR:
5345       self.required_nodes = 2
5346     else:
5347       self.required_nodes = 1
5348     request = {
5349       "type": "allocate",
5350       "name": self.name,
5351       "disk_template": self.disk_template,
5352       "tags": self.tags,
5353       "os": self.os,
5354       "vcpus": self.vcpus,
5355       "memory": self.mem_size,
5356       "disks": self.disks,
5357       "disk_space_total": disk_space,
5358       "nics": self.nics,
5359       "required_nodes": self.required_nodes,
5360       }
5361     data["request"] = request
5362
5363   def _AddRelocateInstance(self):
5364     """Add relocate instance data to allocator structure.
5365
5366     This in combination with _IAllocatorGetClusterData will create the
5367     correct structure needed as input for the allocator.
5368
5369     The checks for the completeness of the opcode must have already been
5370     done.
5371
5372     """
5373     instance = self.cfg.GetInstanceInfo(self.name)
5374     if instance is None:
5375       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5376                                    " IAllocator" % self.name)
5377
5378     if instance.disk_template not in constants.DTS_NET_MIRROR:
5379       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5380
5381     if len(instance.secondary_nodes) != 1:
5382       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5383
5384     self.required_nodes = 1
5385
5386     disk_space = _ComputeDiskSize(instance.disk_template,
5387                                   instance.disks[0].size,
5388                                   instance.disks[1].size)
5389
5390     request = {
5391       "type": "relocate",
5392       "name": self.name,
5393       "disk_space_total": disk_space,
5394       "required_nodes": self.required_nodes,
5395       "relocate_from": self.relocate_from,
5396       }
5397     self.in_data["request"] = request
5398
5399   def _BuildInputData(self):
5400     """Build input data structures.
5401
5402     """
5403     self._ComputeClusterData()
5404
5405     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5406       self._AddNewInstance()
5407     else:
5408       self._AddRelocateInstance()
5409
5410     self.in_text = serializer.Dump(self.in_data)
5411
5412   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5413     """Run an instance allocator and return the results.
5414
5415     """
5416     data = self.in_text
5417
5418     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5419
5420     if not isinstance(result, tuple) or len(result) != 4:
5421       raise errors.OpExecError("Invalid result from master iallocator runner")
5422
5423     rcode, stdout, stderr, fail = result
5424
5425     if rcode == constants.IARUN_NOTFOUND:
5426       raise errors.OpExecError("Can't find allocator '%s'" % name)
5427     elif rcode == constants.IARUN_FAILURE:
5428         raise errors.OpExecError("Instance allocator call failed: %s,"
5429                                  " output: %s" %
5430                                  (fail, stdout+stderr))
5431     self.out_text = stdout
5432     if validate:
5433       self._ValidateResult()
5434
5435   def _ValidateResult(self):
5436     """Process the allocator results.
5437
5438     This will process and if successful save the result in
5439     self.out_data and the other parameters.
5440
5441     """
5442     try:
5443       rdict = serializer.Load(self.out_text)
5444     except Exception, err:
5445       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5446
5447     if not isinstance(rdict, dict):
5448       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5449
5450     for key in "success", "info", "nodes":
5451       if key not in rdict:
5452         raise errors.OpExecError("Can't parse iallocator results:"
5453                                  " missing key '%s'" % key)
5454       setattr(self, key, rdict[key])
5455
5456     if not isinstance(rdict["nodes"], list):
5457       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5458                                " is not a list")
5459     self.out_data = rdict
5460
5461
5462 class LUTestAllocator(NoHooksLU):
5463   """Run allocator tests.
5464
5465   This LU runs the allocator tests
5466
5467   """
5468   _OP_REQP = ["direction", "mode", "name"]
5469
5470   def CheckPrereq(self):
5471     """Check prerequisites.
5472
5473     This checks the opcode parameters depending on the director and mode test.
5474
5475     """
5476     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5477       for attr in ["name", "mem_size", "disks", "disk_template",
5478                    "os", "tags", "nics", "vcpus"]:
5479         if not hasattr(self.op, attr):
5480           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5481                                      attr)
5482       iname = self.cfg.ExpandInstanceName(self.op.name)
5483       if iname is not None:
5484         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5485                                    iname)
5486       if not isinstance(self.op.nics, list):
5487         raise errors.OpPrereqError("Invalid parameter 'nics'")
5488       for row in self.op.nics:
5489         if (not isinstance(row, dict) or
5490             "mac" not in row or
5491             "ip" not in row or
5492             "bridge" not in row):
5493           raise errors.OpPrereqError("Invalid contents of the"
5494                                      " 'nics' parameter")
5495       if not isinstance(self.op.disks, list):
5496         raise errors.OpPrereqError("Invalid parameter 'disks'")
5497       if len(self.op.disks) != 2:
5498         raise errors.OpPrereqError("Only two-disk configurations supported")
5499       for row in self.op.disks:
5500         if (not isinstance(row, dict) or
5501             "size" not in row or
5502             not isinstance(row["size"], int) or
5503             "mode" not in row or
5504             row["mode"] not in ['r', 'w']):
5505           raise errors.OpPrereqError("Invalid contents of the"
5506                                      " 'disks' parameter")
5507     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5508       if not hasattr(self.op, "name"):
5509         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5510       fname = self.cfg.ExpandInstanceName(self.op.name)
5511       if fname is None:
5512         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5513                                    self.op.name)
5514       self.op.name = fname
5515       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5516     else:
5517       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5518                                  self.op.mode)
5519
5520     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5521       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5522         raise errors.OpPrereqError("Missing allocator name")
5523     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5524       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5525                                  self.op.direction)
5526
5527   def Exec(self, feedback_fn):
5528     """Run the allocator test.
5529
5530     """
5531     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5532       ial = IAllocator(self.cfg, self.sstore,
5533                        mode=self.op.mode,
5534                        name=self.op.name,
5535                        mem_size=self.op.mem_size,
5536                        disks=self.op.disks,
5537                        disk_template=self.op.disk_template,
5538                        os=self.op.os,
5539                        tags=self.op.tags,
5540                        nics=self.op.nics,
5541                        vcpus=self.op.vcpus,
5542                        )
5543     else:
5544       ial = IAllocator(self.cfg, self.sstore,
5545                        mode=self.op.mode,
5546                        name=self.op.name,
5547                        relocate_from=list(self.relocate_from),
5548                        )
5549
5550     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5551       result = ial.in_text
5552     else:
5553       ial.Run(self.op.allocator, validate=False)
5554       result = ial.out_text
5555     return result