Implement disk grow at LU level
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     # TODO: populate the environment with useful information for verify hooks
862     env = {}
863     return env, [], all_nodes
864
865   def Exec(self, feedback_fn):
866     """Verify integrity of cluster, performing various test on nodes.
867
868     """
869     bad = False
870     feedback_fn("* Verifying global settings")
871     for msg in self.cfg.VerifyConfig():
872       feedback_fn("  - ERROR: %s" % msg)
873
874     vg_name = self.cfg.GetVGName()
875     nodelist = utils.NiceSort(self.cfg.GetNodeList())
876     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
877     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
878     i_non_redundant = [] # Non redundant instances
879     node_volume = {}
880     node_instance = {}
881     node_info = {}
882     instance_cfg = {}
883
884     # FIXME: verify OS list
885     # do local checksums
886     file_names = list(self.sstore.GetFileList())
887     file_names.append(constants.SSL_CERT_FILE)
888     file_names.append(constants.CLUSTER_CONF_FILE)
889     local_checksums = utils.FingerprintFiles(file_names)
890
891     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
892     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
893     all_instanceinfo = rpc.call_instance_list(nodelist)
894     all_vglist = rpc.call_vg_list(nodelist)
895     node_verify_param = {
896       'filelist': file_names,
897       'nodelist': nodelist,
898       'hypervisor': None,
899       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
900                         for node in nodeinfo]
901       }
902     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
903     all_rversion = rpc.call_version(nodelist)
904     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
905
906     for node in nodelist:
907       feedback_fn("* Verifying node %s" % node)
908       result = self._VerifyNode(node, file_names, local_checksums,
909                                 all_vglist[node], all_nvinfo[node],
910                                 all_rversion[node], feedback_fn)
911       bad = bad or result
912
913       # node_volume
914       volumeinfo = all_volumeinfo[node]
915
916       if isinstance(volumeinfo, basestring):
917         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
918                     (node, volumeinfo[-400:].encode('string_escape')))
919         bad = True
920         node_volume[node] = {}
921       elif not isinstance(volumeinfo, dict):
922         feedback_fn("  - ERROR: connection to %s failed" % (node,))
923         bad = True
924         continue
925       else:
926         node_volume[node] = volumeinfo
927
928       # node_instance
929       nodeinstance = all_instanceinfo[node]
930       if type(nodeinstance) != list:
931         feedback_fn("  - ERROR: connection to %s failed" % (node,))
932         bad = True
933         continue
934
935       node_instance[node] = nodeinstance
936
937       # node_info
938       nodeinfo = all_ninfo[node]
939       if not isinstance(nodeinfo, dict):
940         feedback_fn("  - ERROR: connection to %s failed" % (node,))
941         bad = True
942         continue
943
944       try:
945         node_info[node] = {
946           "mfree": int(nodeinfo['memory_free']),
947           "dfree": int(nodeinfo['vg_free']),
948           "pinst": [],
949           "sinst": [],
950           # dictionary holding all instances this node is secondary for,
951           # grouped by their primary node. Each key is a cluster node, and each
952           # value is a list of instances which have the key as primary and the
953           # current node as secondary.  this is handy to calculate N+1 memory
954           # availability if you can only failover from a primary to its
955           # secondary.
956           "sinst-by-pnode": {},
957         }
958       except ValueError:
959         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
960         bad = True
961         continue
962
963     node_vol_should = {}
964
965     for instance in instancelist:
966       feedback_fn("* Verifying instance %s" % instance)
967       inst_config = self.cfg.GetInstanceInfo(instance)
968       result =  self._VerifyInstance(instance, inst_config, node_volume,
969                                      node_instance, feedback_fn)
970       bad = bad or result
971
972       inst_config.MapLVsByNode(node_vol_should)
973
974       instance_cfg[instance] = inst_config
975
976       pnode = inst_config.primary_node
977       if pnode in node_info:
978         node_info[pnode]['pinst'].append(instance)
979       else:
980         feedback_fn("  - ERROR: instance %s, connection to primary node"
981                     " %s failed" % (instance, pnode))
982         bad = True
983
984       # If the instance is non-redundant we cannot survive losing its primary
985       # node, so we are not N+1 compliant. On the other hand we have no disk
986       # templates with more than one secondary so that situation is not well
987       # supported either.
988       # FIXME: does not support file-backed instances
989       if len(inst_config.secondary_nodes) == 0:
990         i_non_redundant.append(instance)
991       elif len(inst_config.secondary_nodes) > 1:
992         feedback_fn("  - WARNING: multiple secondaries for instance %s"
993                     % instance)
994
995       for snode in inst_config.secondary_nodes:
996         if snode in node_info:
997           node_info[snode]['sinst'].append(instance)
998           if pnode not in node_info[snode]['sinst-by-pnode']:
999             node_info[snode]['sinst-by-pnode'][pnode] = []
1000           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1001         else:
1002           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1003                       " %s failed" % (instance, snode))
1004
1005     feedback_fn("* Verifying orphan volumes")
1006     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1007                                        feedback_fn)
1008     bad = bad or result
1009
1010     feedback_fn("* Verifying remaining instances")
1011     result = self._VerifyOrphanInstances(instancelist, node_instance,
1012                                          feedback_fn)
1013     bad = bad or result
1014
1015     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1016       feedback_fn("* Verifying N+1 Memory redundancy")
1017       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1018       bad = bad or result
1019
1020     feedback_fn("* Other Notes")
1021     if i_non_redundant:
1022       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1023                   % len(i_non_redundant))
1024
1025     return int(bad)
1026
1027   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1028     """Analize the post-hooks' result, handle it, and send some
1029     nicely-formatted feedback back to the user.
1030
1031     Args:
1032       phase: the hooks phase that has just been run
1033       hooks_results: the results of the multi-node hooks rpc call
1034       feedback_fn: function to send feedback back to the caller
1035       lu_result: previous Exec result
1036
1037     """
1038     # We only really run POST phase hooks, and are only interested in their results
1039     if phase == constants.HOOKS_PHASE_POST:
1040       # Used to change hooks' output to proper indentation
1041       indent_re = re.compile('^', re.M)
1042       feedback_fn("* Hooks Results")
1043       if not hooks_results:
1044         feedback_fn("  - ERROR: general communication failure")
1045         lu_result = 1
1046       else:
1047         for node_name in hooks_results:
1048           show_node_header = True
1049           res = hooks_results[node_name]
1050           if res is False or not isinstance(res, list):
1051             feedback_fn("    Communication failure")
1052             lu_result = 1
1053             continue
1054           for script, hkr, output in res:
1055             if hkr == constants.HKR_FAIL:
1056               # The node header is only shown once, if there are
1057               # failing hooks on that node
1058               if show_node_header:
1059                 feedback_fn("  Node %s:" % node_name)
1060                 show_node_header = False
1061               feedback_fn("    ERROR: Script %s failed, output:" % script)
1062               output = indent_re.sub('      ', output)
1063               feedback_fn("%s" % output)
1064               lu_result = 1
1065
1066       return lu_result
1067
1068
1069 class LUVerifyDisks(NoHooksLU):
1070   """Verifies the cluster disks status.
1071
1072   """
1073   _OP_REQP = []
1074
1075   def CheckPrereq(self):
1076     """Check prerequisites.
1077
1078     This has no prerequisites.
1079
1080     """
1081     pass
1082
1083   def Exec(self, feedback_fn):
1084     """Verify integrity of cluster disks.
1085
1086     """
1087     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1088
1089     vg_name = self.cfg.GetVGName()
1090     nodes = utils.NiceSort(self.cfg.GetNodeList())
1091     instances = [self.cfg.GetInstanceInfo(name)
1092                  for name in self.cfg.GetInstanceList()]
1093
1094     nv_dict = {}
1095     for inst in instances:
1096       inst_lvs = {}
1097       if (inst.status != "up" or
1098           inst.disk_template not in constants.DTS_NET_MIRROR):
1099         continue
1100       inst.MapLVsByNode(inst_lvs)
1101       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1102       for node, vol_list in inst_lvs.iteritems():
1103         for vol in vol_list:
1104           nv_dict[(node, vol)] = inst
1105
1106     if not nv_dict:
1107       return result
1108
1109     node_lvs = rpc.call_volume_list(nodes, vg_name)
1110
1111     to_act = set()
1112     for node in nodes:
1113       # node_volume
1114       lvs = node_lvs[node]
1115
1116       if isinstance(lvs, basestring):
1117         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1118         res_nlvm[node] = lvs
1119       elif not isinstance(lvs, dict):
1120         logger.Info("connection to node %s failed or invalid data returned" %
1121                     (node,))
1122         res_nodes.append(node)
1123         continue
1124
1125       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1126         inst = nv_dict.pop((node, lv_name), None)
1127         if (not lv_online and inst is not None
1128             and inst.name not in res_instances):
1129           res_instances.append(inst.name)
1130
1131     # any leftover items in nv_dict are missing LVs, let's arrange the
1132     # data better
1133     for key, inst in nv_dict.iteritems():
1134       if inst.name not in res_missing:
1135         res_missing[inst.name] = []
1136       res_missing[inst.name].append(key)
1137
1138     return result
1139
1140
1141 class LURenameCluster(LogicalUnit):
1142   """Rename the cluster.
1143
1144   """
1145   HPATH = "cluster-rename"
1146   HTYPE = constants.HTYPE_CLUSTER
1147   _OP_REQP = ["name"]
1148
1149   def BuildHooksEnv(self):
1150     """Build hooks env.
1151
1152     """
1153     env = {
1154       "OP_TARGET": self.sstore.GetClusterName(),
1155       "NEW_NAME": self.op.name,
1156       }
1157     mn = self.sstore.GetMasterNode()
1158     return env, [mn], [mn]
1159
1160   def CheckPrereq(self):
1161     """Verify that the passed name is a valid one.
1162
1163     """
1164     hostname = utils.HostInfo(self.op.name)
1165
1166     new_name = hostname.name
1167     self.ip = new_ip = hostname.ip
1168     old_name = self.sstore.GetClusterName()
1169     old_ip = self.sstore.GetMasterIP()
1170     if new_name == old_name and new_ip == old_ip:
1171       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1172                                  " cluster has changed")
1173     if new_ip != old_ip:
1174       result = utils.RunCmd(["fping", "-q", new_ip])
1175       if not result.failed:
1176         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1177                                    " reachable on the network. Aborting." %
1178                                    new_ip)
1179
1180     self.op.name = new_name
1181
1182   def Exec(self, feedback_fn):
1183     """Rename the cluster.
1184
1185     """
1186     clustername = self.op.name
1187     ip = self.ip
1188     ss = self.sstore
1189
1190     # shutdown the master IP
1191     master = ss.GetMasterNode()
1192     if not rpc.call_node_stop_master(master):
1193       raise errors.OpExecError("Could not disable the master role")
1194
1195     try:
1196       # modify the sstore
1197       ss.SetKey(ss.SS_MASTER_IP, ip)
1198       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1199
1200       # Distribute updated ss config to all nodes
1201       myself = self.cfg.GetNodeInfo(master)
1202       dist_nodes = self.cfg.GetNodeList()
1203       if myself.name in dist_nodes:
1204         dist_nodes.remove(myself.name)
1205
1206       logger.Debug("Copying updated ssconf data to all nodes")
1207       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1208         fname = ss.KeyToFilename(keyname)
1209         result = rpc.call_upload_file(dist_nodes, fname)
1210         for to_node in dist_nodes:
1211           if not result[to_node]:
1212             logger.Error("copy of file %s to node %s failed" %
1213                          (fname, to_node))
1214     finally:
1215       if not rpc.call_node_start_master(master):
1216         logger.Error("Could not re-enable the master role on the master,"
1217                      " please restart manually.")
1218
1219
1220 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1221   """Sleep and poll for an instance's disk to sync.
1222
1223   """
1224   if not instance.disks:
1225     return True
1226
1227   if not oneshot:
1228     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1229
1230   node = instance.primary_node
1231
1232   for dev in instance.disks:
1233     cfgw.SetDiskID(dev, node)
1234
1235   retries = 0
1236   while True:
1237     max_time = 0
1238     done = True
1239     cumul_degraded = False
1240     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1241     if not rstats:
1242       proc.LogWarning("Can't get any data from node %s" % node)
1243       retries += 1
1244       if retries >= 10:
1245         raise errors.RemoteError("Can't contact node %s for mirror data,"
1246                                  " aborting." % node)
1247       time.sleep(6)
1248       continue
1249     retries = 0
1250     for i in range(len(rstats)):
1251       mstat = rstats[i]
1252       if mstat is None:
1253         proc.LogWarning("Can't compute data for node %s/%s" %
1254                         (node, instance.disks[i].iv_name))
1255         continue
1256       # we ignore the ldisk parameter
1257       perc_done, est_time, is_degraded, _ = mstat
1258       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1259       if perc_done is not None:
1260         done = False
1261         if est_time is not None:
1262           rem_time = "%d estimated seconds remaining" % est_time
1263           max_time = est_time
1264         else:
1265           rem_time = "no time estimate"
1266         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1267                      (instance.disks[i].iv_name, perc_done, rem_time))
1268     if done or oneshot:
1269       break
1270
1271     if unlock:
1272       utils.Unlock('cmd')
1273     try:
1274       time.sleep(min(60, max_time))
1275     finally:
1276       if unlock:
1277         utils.Lock('cmd')
1278
1279   if done:
1280     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1281   return not cumul_degraded
1282
1283
1284 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1285   """Check that mirrors are not degraded.
1286
1287   The ldisk parameter, if True, will change the test from the
1288   is_degraded attribute (which represents overall non-ok status for
1289   the device(s)) to the ldisk (representing the local storage status).
1290
1291   """
1292   cfgw.SetDiskID(dev, node)
1293   if ldisk:
1294     idx = 6
1295   else:
1296     idx = 5
1297
1298   result = True
1299   if on_primary or dev.AssembleOnSecondary():
1300     rstats = rpc.call_blockdev_find(node, dev)
1301     if not rstats:
1302       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1303       result = False
1304     else:
1305       result = result and (not rstats[idx])
1306   if dev.children:
1307     for child in dev.children:
1308       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1309
1310   return result
1311
1312
1313 class LUDiagnoseOS(NoHooksLU):
1314   """Logical unit for OS diagnose/query.
1315
1316   """
1317   _OP_REQP = ["output_fields", "names"]
1318
1319   def CheckPrereq(self):
1320     """Check prerequisites.
1321
1322     This always succeeds, since this is a pure query LU.
1323
1324     """
1325     if self.op.names:
1326       raise errors.OpPrereqError("Selective OS query not supported")
1327
1328     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1329     _CheckOutputFields(static=[],
1330                        dynamic=self.dynamic_fields,
1331                        selected=self.op.output_fields)
1332
1333   @staticmethod
1334   def _DiagnoseByOS(node_list, rlist):
1335     """Remaps a per-node return list into an a per-os per-node dictionary
1336
1337       Args:
1338         node_list: a list with the names of all nodes
1339         rlist: a map with node names as keys and OS objects as values
1340
1341       Returns:
1342         map: a map with osnames as keys and as value another map, with
1343              nodes as
1344              keys and list of OS objects as values
1345              e.g. {"debian-etch": {"node1": [<object>,...],
1346                                    "node2": [<object>,]}
1347                   }
1348
1349     """
1350     all_os = {}
1351     for node_name, nr in rlist.iteritems():
1352       if not nr:
1353         continue
1354       for os in nr:
1355         if os.name not in all_os:
1356           # build a list of nodes for this os containing empty lists
1357           # for each node in node_list
1358           all_os[os.name] = {}
1359           for nname in node_list:
1360             all_os[os.name][nname] = []
1361         all_os[os.name][node_name].append(os)
1362     return all_os
1363
1364   def Exec(self, feedback_fn):
1365     """Compute the list of OSes.
1366
1367     """
1368     node_list = self.cfg.GetNodeList()
1369     node_data = rpc.call_os_diagnose(node_list)
1370     if node_data == False:
1371       raise errors.OpExecError("Can't gather the list of OSes")
1372     pol = self._DiagnoseByOS(node_list, node_data)
1373     output = []
1374     for os_name, os_data in pol.iteritems():
1375       row = []
1376       for field in self.op.output_fields:
1377         if field == "name":
1378           val = os_name
1379         elif field == "valid":
1380           val = utils.all([osl and osl[0] for osl in os_data.values()])
1381         elif field == "node_status":
1382           val = {}
1383           for node_name, nos_list in os_data.iteritems():
1384             val[node_name] = [(v.status, v.path) for v in nos_list]
1385         else:
1386           raise errors.ParameterError(field)
1387         row.append(val)
1388       output.append(row)
1389
1390     return output
1391
1392
1393 class LURemoveNode(LogicalUnit):
1394   """Logical unit for removing a node.
1395
1396   """
1397   HPATH = "node-remove"
1398   HTYPE = constants.HTYPE_NODE
1399   _OP_REQP = ["node_name"]
1400
1401   def BuildHooksEnv(self):
1402     """Build hooks env.
1403
1404     This doesn't run on the target node in the pre phase as a failed
1405     node would not allows itself to run.
1406
1407     """
1408     env = {
1409       "OP_TARGET": self.op.node_name,
1410       "NODE_NAME": self.op.node_name,
1411       }
1412     all_nodes = self.cfg.GetNodeList()
1413     all_nodes.remove(self.op.node_name)
1414     return env, all_nodes, all_nodes
1415
1416   def CheckPrereq(self):
1417     """Check prerequisites.
1418
1419     This checks:
1420      - the node exists in the configuration
1421      - it does not have primary or secondary instances
1422      - it's not the master
1423
1424     Any errors are signalled by raising errors.OpPrereqError.
1425
1426     """
1427     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1428     if node is None:
1429       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1430
1431     instance_list = self.cfg.GetInstanceList()
1432
1433     masternode = self.sstore.GetMasterNode()
1434     if node.name == masternode:
1435       raise errors.OpPrereqError("Node is the master node,"
1436                                  " you need to failover first.")
1437
1438     for instance_name in instance_list:
1439       instance = self.cfg.GetInstanceInfo(instance_name)
1440       if node.name == instance.primary_node:
1441         raise errors.OpPrereqError("Instance %s still running on the node,"
1442                                    " please remove first." % instance_name)
1443       if node.name in instance.secondary_nodes:
1444         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1445                                    " please remove first." % instance_name)
1446     self.op.node_name = node.name
1447     self.node = node
1448
1449   def Exec(self, feedback_fn):
1450     """Removes the node from the cluster.
1451
1452     """
1453     node = self.node
1454     logger.Info("stopping the node daemon and removing configs from node %s" %
1455                 node.name)
1456
1457     rpc.call_node_leave_cluster(node.name)
1458
1459     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1460
1461     logger.Info("Removing node %s from config" % node.name)
1462
1463     self.cfg.RemoveNode(node.name)
1464
1465     _RemoveHostFromEtcHosts(node.name)
1466
1467
1468 class LUQueryNodes(NoHooksLU):
1469   """Logical unit for querying nodes.
1470
1471   """
1472   _OP_REQP = ["output_fields", "names"]
1473
1474   def CheckPrereq(self):
1475     """Check prerequisites.
1476
1477     This checks that the fields required are valid output fields.
1478
1479     """
1480     self.dynamic_fields = frozenset([
1481       "dtotal", "dfree",
1482       "mtotal", "mnode", "mfree",
1483       "bootid",
1484       "ctotal",
1485       ])
1486
1487     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1488                                "pinst_list", "sinst_list",
1489                                "pip", "sip"],
1490                        dynamic=self.dynamic_fields,
1491                        selected=self.op.output_fields)
1492
1493     self.wanted = _GetWantedNodes(self, self.op.names)
1494
1495   def Exec(self, feedback_fn):
1496     """Computes the list of nodes and their attributes.
1497
1498     """
1499     nodenames = self.wanted
1500     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1501
1502     # begin data gathering
1503
1504     if self.dynamic_fields.intersection(self.op.output_fields):
1505       live_data = {}
1506       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1507       for name in nodenames:
1508         nodeinfo = node_data.get(name, None)
1509         if nodeinfo:
1510           live_data[name] = {
1511             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1512             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1513             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1514             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1515             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1516             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1517             "bootid": nodeinfo['bootid'],
1518             }
1519         else:
1520           live_data[name] = {}
1521     else:
1522       live_data = dict.fromkeys(nodenames, {})
1523
1524     node_to_primary = dict([(name, set()) for name in nodenames])
1525     node_to_secondary = dict([(name, set()) for name in nodenames])
1526
1527     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1528                              "sinst_cnt", "sinst_list"))
1529     if inst_fields & frozenset(self.op.output_fields):
1530       instancelist = self.cfg.GetInstanceList()
1531
1532       for instance_name in instancelist:
1533         inst = self.cfg.GetInstanceInfo(instance_name)
1534         if inst.primary_node in node_to_primary:
1535           node_to_primary[inst.primary_node].add(inst.name)
1536         for secnode in inst.secondary_nodes:
1537           if secnode in node_to_secondary:
1538             node_to_secondary[secnode].add(inst.name)
1539
1540     # end data gathering
1541
1542     output = []
1543     for node in nodelist:
1544       node_output = []
1545       for field in self.op.output_fields:
1546         if field == "name":
1547           val = node.name
1548         elif field == "pinst_list":
1549           val = list(node_to_primary[node.name])
1550         elif field == "sinst_list":
1551           val = list(node_to_secondary[node.name])
1552         elif field == "pinst_cnt":
1553           val = len(node_to_primary[node.name])
1554         elif field == "sinst_cnt":
1555           val = len(node_to_secondary[node.name])
1556         elif field == "pip":
1557           val = node.primary_ip
1558         elif field == "sip":
1559           val = node.secondary_ip
1560         elif field in self.dynamic_fields:
1561           val = live_data[node.name].get(field, None)
1562         else:
1563           raise errors.ParameterError(field)
1564         node_output.append(val)
1565       output.append(node_output)
1566
1567     return output
1568
1569
1570 class LUQueryNodeVolumes(NoHooksLU):
1571   """Logical unit for getting volumes on node(s).
1572
1573   """
1574   _OP_REQP = ["nodes", "output_fields"]
1575
1576   def CheckPrereq(self):
1577     """Check prerequisites.
1578
1579     This checks that the fields required are valid output fields.
1580
1581     """
1582     self.nodes = _GetWantedNodes(self, self.op.nodes)
1583
1584     _CheckOutputFields(static=["node"],
1585                        dynamic=["phys", "vg", "name", "size", "instance"],
1586                        selected=self.op.output_fields)
1587
1588
1589   def Exec(self, feedback_fn):
1590     """Computes the list of nodes and their attributes.
1591
1592     """
1593     nodenames = self.nodes
1594     volumes = rpc.call_node_volumes(nodenames)
1595
1596     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1597              in self.cfg.GetInstanceList()]
1598
1599     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1600
1601     output = []
1602     for node in nodenames:
1603       if node not in volumes or not volumes[node]:
1604         continue
1605
1606       node_vols = volumes[node][:]
1607       node_vols.sort(key=lambda vol: vol['dev'])
1608
1609       for vol in node_vols:
1610         node_output = []
1611         for field in self.op.output_fields:
1612           if field == "node":
1613             val = node
1614           elif field == "phys":
1615             val = vol['dev']
1616           elif field == "vg":
1617             val = vol['vg']
1618           elif field == "name":
1619             val = vol['name']
1620           elif field == "size":
1621             val = int(float(vol['size']))
1622           elif field == "instance":
1623             for inst in ilist:
1624               if node not in lv_by_node[inst]:
1625                 continue
1626               if vol['name'] in lv_by_node[inst][node]:
1627                 val = inst.name
1628                 break
1629             else:
1630               val = '-'
1631           else:
1632             raise errors.ParameterError(field)
1633           node_output.append(str(val))
1634
1635         output.append(node_output)
1636
1637     return output
1638
1639
1640 class LUAddNode(LogicalUnit):
1641   """Logical unit for adding node to the cluster.
1642
1643   """
1644   HPATH = "node-add"
1645   HTYPE = constants.HTYPE_NODE
1646   _OP_REQP = ["node_name"]
1647
1648   def BuildHooksEnv(self):
1649     """Build hooks env.
1650
1651     This will run on all nodes before, and on all nodes + the new node after.
1652
1653     """
1654     env = {
1655       "OP_TARGET": self.op.node_name,
1656       "NODE_NAME": self.op.node_name,
1657       "NODE_PIP": self.op.primary_ip,
1658       "NODE_SIP": self.op.secondary_ip,
1659       }
1660     nodes_0 = self.cfg.GetNodeList()
1661     nodes_1 = nodes_0 + [self.op.node_name, ]
1662     return env, nodes_0, nodes_1
1663
1664   def CheckPrereq(self):
1665     """Check prerequisites.
1666
1667     This checks:
1668      - the new node is not already in the config
1669      - it is resolvable
1670      - its parameters (single/dual homed) matches the cluster
1671
1672     Any errors are signalled by raising errors.OpPrereqError.
1673
1674     """
1675     node_name = self.op.node_name
1676     cfg = self.cfg
1677
1678     dns_data = utils.HostInfo(node_name)
1679
1680     node = dns_data.name
1681     primary_ip = self.op.primary_ip = dns_data.ip
1682     secondary_ip = getattr(self.op, "secondary_ip", None)
1683     if secondary_ip is None:
1684       secondary_ip = primary_ip
1685     if not utils.IsValidIP(secondary_ip):
1686       raise errors.OpPrereqError("Invalid secondary IP given")
1687     self.op.secondary_ip = secondary_ip
1688
1689     node_list = cfg.GetNodeList()
1690     if not self.op.readd and node in node_list:
1691       raise errors.OpPrereqError("Node %s is already in the configuration" %
1692                                  node)
1693     elif self.op.readd and node not in node_list:
1694       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1695
1696     for existing_node_name in node_list:
1697       existing_node = cfg.GetNodeInfo(existing_node_name)
1698
1699       if self.op.readd and node == existing_node_name:
1700         if (existing_node.primary_ip != primary_ip or
1701             existing_node.secondary_ip != secondary_ip):
1702           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1703                                      " address configuration as before")
1704         continue
1705
1706       if (existing_node.primary_ip == primary_ip or
1707           existing_node.secondary_ip == primary_ip or
1708           existing_node.primary_ip == secondary_ip or
1709           existing_node.secondary_ip == secondary_ip):
1710         raise errors.OpPrereqError("New node ip address(es) conflict with"
1711                                    " existing node %s" % existing_node.name)
1712
1713     # check that the type of the node (single versus dual homed) is the
1714     # same as for the master
1715     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1716     master_singlehomed = myself.secondary_ip == myself.primary_ip
1717     newbie_singlehomed = secondary_ip == primary_ip
1718     if master_singlehomed != newbie_singlehomed:
1719       if master_singlehomed:
1720         raise errors.OpPrereqError("The master has no private ip but the"
1721                                    " new node has one")
1722       else:
1723         raise errors.OpPrereqError("The master has a private ip but the"
1724                                    " new node doesn't have one")
1725
1726     # checks reachablity
1727     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1728       raise errors.OpPrereqError("Node not reachable by ping")
1729
1730     if not newbie_singlehomed:
1731       # check reachability from my secondary ip to newbie's secondary ip
1732       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1733                            source=myself.secondary_ip):
1734         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1735                                    " based ping to noded port")
1736
1737     self.new_node = objects.Node(name=node,
1738                                  primary_ip=primary_ip,
1739                                  secondary_ip=secondary_ip)
1740
1741     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1743         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1744                                    constants.VNC_PASSWORD_FILE)
1745
1746   def Exec(self, feedback_fn):
1747     """Adds the new node to the cluster.
1748
1749     """
1750     new_node = self.new_node
1751     node = new_node.name
1752
1753     # set up inter-node password and certificate and restarts the node daemon
1754     gntpass = self.sstore.GetNodeDaemonPassword()
1755     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1756       raise errors.OpExecError("ganeti password corruption detected")
1757     f = open(constants.SSL_CERT_FILE)
1758     try:
1759       gntpem = f.read(8192)
1760     finally:
1761       f.close()
1762     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1763     # so we use this to detect an invalid certificate; as long as the
1764     # cert doesn't contain this, the here-document will be correctly
1765     # parsed by the shell sequence below
1766     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1767       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1768     if not gntpem.endswith("\n"):
1769       raise errors.OpExecError("PEM must end with newline")
1770     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1771
1772     # and then connect with ssh to set password and start ganeti-noded
1773     # note that all the below variables are sanitized at this point,
1774     # either by being constants or by the checks above
1775     ss = self.sstore
1776     mycommand = ("umask 077 && "
1777                  "echo '%s' > '%s' && "
1778                  "cat > '%s' << '!EOF.' && \n"
1779                  "%s!EOF.\n%s restart" %
1780                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1781                   constants.SSL_CERT_FILE, gntpem,
1782                   constants.NODE_INITD_SCRIPT))
1783
1784     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1785     if result.failed:
1786       raise errors.OpExecError("Remote command on node %s, error: %s,"
1787                                " output: %s" %
1788                                (node, result.fail_reason, result.output))
1789
1790     # check connectivity
1791     time.sleep(4)
1792
1793     result = rpc.call_version([node])[node]
1794     if result:
1795       if constants.PROTOCOL_VERSION == result:
1796         logger.Info("communication to node %s fine, sw version %s match" %
1797                     (node, result))
1798       else:
1799         raise errors.OpExecError("Version mismatch master version %s,"
1800                                  " node version %s" %
1801                                  (constants.PROTOCOL_VERSION, result))
1802     else:
1803       raise errors.OpExecError("Cannot get version from the new node")
1804
1805     # setup ssh on node
1806     logger.Info("copy ssh key to node %s" % node)
1807     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1808     keyarray = []
1809     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1810                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1811                 priv_key, pub_key]
1812
1813     for i in keyfiles:
1814       f = open(i, 'r')
1815       try:
1816         keyarray.append(f.read())
1817       finally:
1818         f.close()
1819
1820     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1821                                keyarray[3], keyarray[4], keyarray[5])
1822
1823     if not result:
1824       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1825
1826     # Add node to our /etc/hosts, and add key to known_hosts
1827     _AddHostToEtcHosts(new_node.name)
1828
1829     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1830                       self.cfg.GetHostKey())
1831
1832     if new_node.secondary_ip != new_node.primary_ip:
1833       if not rpc.call_node_tcp_ping(new_node.name,
1834                                     constants.LOCALHOST_IP_ADDRESS,
1835                                     new_node.secondary_ip,
1836                                     constants.DEFAULT_NODED_PORT,
1837                                     10, False):
1838         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1839                                  " you gave (%s). Please fix and re-run this"
1840                                  " command." % new_node.secondary_ip)
1841
1842     success, msg = ssh.VerifyNodeHostname(node)
1843     if not success:
1844       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1845                                " than the one the resolver gives: %s."
1846                                " Please fix and re-run this command." %
1847                                (node, msg))
1848
1849     # Distribute updated /etc/hosts and known_hosts to all nodes,
1850     # including the node just added
1851     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1852     dist_nodes = self.cfg.GetNodeList()
1853     if not self.op.readd:
1854       dist_nodes.append(node)
1855     if myself.name in dist_nodes:
1856       dist_nodes.remove(myself.name)
1857
1858     logger.Debug("Copying hosts and known_hosts to all nodes")
1859     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1860       result = rpc.call_upload_file(dist_nodes, fname)
1861       for to_node in dist_nodes:
1862         if not result[to_node]:
1863           logger.Error("copy of file %s to node %s failed" %
1864                        (fname, to_node))
1865
1866     to_copy = ss.GetFileList()
1867     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1868       to_copy.append(constants.VNC_PASSWORD_FILE)
1869     for fname in to_copy:
1870       if not ssh.CopyFileToNode(node, fname):
1871         logger.Error("could not copy file %s to node %s" % (fname, node))
1872
1873     if not self.op.readd:
1874       logger.Info("adding node %s to cluster.conf" % node)
1875       self.cfg.AddNode(new_node)
1876
1877
1878 class LUMasterFailover(LogicalUnit):
1879   """Failover the master node to the current node.
1880
1881   This is a special LU in that it must run on a non-master node.
1882
1883   """
1884   HPATH = "master-failover"
1885   HTYPE = constants.HTYPE_CLUSTER
1886   REQ_MASTER = False
1887   _OP_REQP = []
1888
1889   def BuildHooksEnv(self):
1890     """Build hooks env.
1891
1892     This will run on the new master only in the pre phase, and on all
1893     the nodes in the post phase.
1894
1895     """
1896     env = {
1897       "OP_TARGET": self.new_master,
1898       "NEW_MASTER": self.new_master,
1899       "OLD_MASTER": self.old_master,
1900       }
1901     return env, [self.new_master], self.cfg.GetNodeList()
1902
1903   def CheckPrereq(self):
1904     """Check prerequisites.
1905
1906     This checks that we are not already the master.
1907
1908     """
1909     self.new_master = utils.HostInfo().name
1910     self.old_master = self.sstore.GetMasterNode()
1911
1912     if self.old_master == self.new_master:
1913       raise errors.OpPrereqError("This commands must be run on the node"
1914                                  " where you want the new master to be."
1915                                  " %s is already the master" %
1916                                  self.old_master)
1917
1918   def Exec(self, feedback_fn):
1919     """Failover the master node.
1920
1921     This command, when run on a non-master node, will cause the current
1922     master to cease being master, and the non-master to become new
1923     master.
1924
1925     """
1926     #TODO: do not rely on gethostname returning the FQDN
1927     logger.Info("setting master to %s, old master: %s" %
1928                 (self.new_master, self.old_master))
1929
1930     if not rpc.call_node_stop_master(self.old_master):
1931       logger.Error("could disable the master role on the old master"
1932                    " %s, please disable manually" % self.old_master)
1933
1934     ss = self.sstore
1935     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1936     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1937                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1938       logger.Error("could not distribute the new simple store master file"
1939                    " to the other nodes, please check.")
1940
1941     if not rpc.call_node_start_master(self.new_master):
1942       logger.Error("could not start the master role on the new master"
1943                    " %s, please check" % self.new_master)
1944       feedback_fn("Error in activating the master IP on the new master,"
1945                   " please fix manually.")
1946
1947
1948
1949 class LUQueryClusterInfo(NoHooksLU):
1950   """Query cluster configuration.
1951
1952   """
1953   _OP_REQP = []
1954   REQ_MASTER = False
1955
1956   def CheckPrereq(self):
1957     """No prerequsites needed for this LU.
1958
1959     """
1960     pass
1961
1962   def Exec(self, feedback_fn):
1963     """Return cluster config.
1964
1965     """
1966     result = {
1967       "name": self.sstore.GetClusterName(),
1968       "software_version": constants.RELEASE_VERSION,
1969       "protocol_version": constants.PROTOCOL_VERSION,
1970       "config_version": constants.CONFIG_VERSION,
1971       "os_api_version": constants.OS_API_VERSION,
1972       "export_version": constants.EXPORT_VERSION,
1973       "master": self.sstore.GetMasterNode(),
1974       "architecture": (platform.architecture()[0], platform.machine()),
1975       "hypervisor_type": self.sstore.GetHypervisorType(),
1976       }
1977
1978     return result
1979
1980
1981 class LUClusterCopyFile(NoHooksLU):
1982   """Copy file to cluster.
1983
1984   """
1985   _OP_REQP = ["nodes", "filename"]
1986
1987   def CheckPrereq(self):
1988     """Check prerequisites.
1989
1990     It should check that the named file exists and that the given list
1991     of nodes is valid.
1992
1993     """
1994     if not os.path.exists(self.op.filename):
1995       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1996
1997     self.nodes = _GetWantedNodes(self, self.op.nodes)
1998
1999   def Exec(self, feedback_fn):
2000     """Copy a file from master to some nodes.
2001
2002     Args:
2003       opts - class with options as members
2004       args - list containing a single element, the file name
2005     Opts used:
2006       nodes - list containing the name of target nodes; if empty, all nodes
2007
2008     """
2009     filename = self.op.filename
2010
2011     myname = utils.HostInfo().name
2012
2013     for node in self.nodes:
2014       if node == myname:
2015         continue
2016       if not ssh.CopyFileToNode(node, filename):
2017         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2018
2019
2020 class LUDumpClusterConfig(NoHooksLU):
2021   """Return a text-representation of the cluster-config.
2022
2023   """
2024   _OP_REQP = []
2025
2026   def CheckPrereq(self):
2027     """No prerequisites.
2028
2029     """
2030     pass
2031
2032   def Exec(self, feedback_fn):
2033     """Dump a representation of the cluster config to the standard output.
2034
2035     """
2036     return self.cfg.DumpConfig()
2037
2038
2039 class LURunClusterCommand(NoHooksLU):
2040   """Run a command on some nodes.
2041
2042   """
2043   _OP_REQP = ["command", "nodes"]
2044
2045   def CheckPrereq(self):
2046     """Check prerequisites.
2047
2048     It checks that the given list of nodes is valid.
2049
2050     """
2051     self.nodes = _GetWantedNodes(self, self.op.nodes)
2052
2053   def Exec(self, feedback_fn):
2054     """Run a command on some nodes.
2055
2056     """
2057     # put the master at the end of the nodes list
2058     master_node = self.sstore.GetMasterNode()
2059     if master_node in self.nodes:
2060       self.nodes.remove(master_node)
2061       self.nodes.append(master_node)
2062
2063     data = []
2064     for node in self.nodes:
2065       result = ssh.SSHCall(node, "root", self.op.command)
2066       data.append((node, result.output, result.exit_code))
2067
2068     return data
2069
2070
2071 class LUActivateInstanceDisks(NoHooksLU):
2072   """Bring up an instance's disks.
2073
2074   """
2075   _OP_REQP = ["instance_name"]
2076
2077   def CheckPrereq(self):
2078     """Check prerequisites.
2079
2080     This checks that the instance is in the cluster.
2081
2082     """
2083     instance = self.cfg.GetInstanceInfo(
2084       self.cfg.ExpandInstanceName(self.op.instance_name))
2085     if instance is None:
2086       raise errors.OpPrereqError("Instance '%s' not known" %
2087                                  self.op.instance_name)
2088     self.instance = instance
2089
2090
2091   def Exec(self, feedback_fn):
2092     """Activate the disks.
2093
2094     """
2095     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2096     if not disks_ok:
2097       raise errors.OpExecError("Cannot activate block devices")
2098
2099     return disks_info
2100
2101
2102 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2103   """Prepare the block devices for an instance.
2104
2105   This sets up the block devices on all nodes.
2106
2107   Args:
2108     instance: a ganeti.objects.Instance object
2109     ignore_secondaries: if true, errors on secondary nodes won't result
2110                         in an error return from the function
2111
2112   Returns:
2113     false if the operation failed
2114     list of (host, instance_visible_name, node_visible_name) if the operation
2115          suceeded with the mapping from node devices to instance devices
2116   """
2117   device_info = []
2118   disks_ok = True
2119   iname = instance.name
2120   # With the two passes mechanism we try to reduce the window of
2121   # opportunity for the race condition of switching DRBD to primary
2122   # before handshaking occured, but we do not eliminate it
2123
2124   # The proper fix would be to wait (with some limits) until the
2125   # connection has been made and drbd transitions from WFConnection
2126   # into any other network-connected state (Connected, SyncTarget,
2127   # SyncSource, etc.)
2128
2129   # 1st pass, assemble on all nodes in secondary mode
2130   for inst_disk in instance.disks:
2131     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2132       cfg.SetDiskID(node_disk, node)
2133       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2134       if not result:
2135         logger.Error("could not prepare block device %s on node %s"
2136                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2137         if not ignore_secondaries:
2138           disks_ok = False
2139
2140   # FIXME: race condition on drbd migration to primary
2141
2142   # 2nd pass, do only the primary node
2143   for inst_disk in instance.disks:
2144     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2145       if node != instance.primary_node:
2146         continue
2147       cfg.SetDiskID(node_disk, node)
2148       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2149       if not result:
2150         logger.Error("could not prepare block device %s on node %s"
2151                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2152         disks_ok = False
2153     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2154
2155   # leave the disks configured for the primary node
2156   # this is a workaround that would be fixed better by
2157   # improving the logical/physical id handling
2158   for disk in instance.disks:
2159     cfg.SetDiskID(disk, instance.primary_node)
2160
2161   return disks_ok, device_info
2162
2163
2164 def _StartInstanceDisks(cfg, instance, force):
2165   """Start the disks of an instance.
2166
2167   """
2168   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2169                                            ignore_secondaries=force)
2170   if not disks_ok:
2171     _ShutdownInstanceDisks(instance, cfg)
2172     if force is not None and not force:
2173       logger.Error("If the message above refers to a secondary node,"
2174                    " you can retry the operation using '--force'.")
2175     raise errors.OpExecError("Disk consistency error")
2176
2177
2178 class LUDeactivateInstanceDisks(NoHooksLU):
2179   """Shutdown an instance's disks.
2180
2181   """
2182   _OP_REQP = ["instance_name"]
2183
2184   def CheckPrereq(self):
2185     """Check prerequisites.
2186
2187     This checks that the instance is in the cluster.
2188
2189     """
2190     instance = self.cfg.GetInstanceInfo(
2191       self.cfg.ExpandInstanceName(self.op.instance_name))
2192     if instance is None:
2193       raise errors.OpPrereqError("Instance '%s' not known" %
2194                                  self.op.instance_name)
2195     self.instance = instance
2196
2197   def Exec(self, feedback_fn):
2198     """Deactivate the disks
2199
2200     """
2201     instance = self.instance
2202     ins_l = rpc.call_instance_list([instance.primary_node])
2203     ins_l = ins_l[instance.primary_node]
2204     if not type(ins_l) is list:
2205       raise errors.OpExecError("Can't contact node '%s'" %
2206                                instance.primary_node)
2207
2208     if self.instance.name in ins_l:
2209       raise errors.OpExecError("Instance is running, can't shutdown"
2210                                " block devices.")
2211
2212     _ShutdownInstanceDisks(instance, self.cfg)
2213
2214
2215 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2216   """Shutdown block devices of an instance.
2217
2218   This does the shutdown on all nodes of the instance.
2219
2220   If the ignore_primary is false, errors on the primary node are
2221   ignored.
2222
2223   """
2224   result = True
2225   for disk in instance.disks:
2226     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2227       cfg.SetDiskID(top_disk, node)
2228       if not rpc.call_blockdev_shutdown(node, top_disk):
2229         logger.Error("could not shutdown block device %s on node %s" %
2230                      (disk.iv_name, node))
2231         if not ignore_primary or node != instance.primary_node:
2232           result = False
2233   return result
2234
2235
2236 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2237   """Checks if a node has enough free memory.
2238
2239   This function check if a given node has the needed amount of free
2240   memory. In case the node has less memory or we cannot get the
2241   information from the node, this function raise an OpPrereqError
2242   exception.
2243
2244   Args:
2245     - cfg: a ConfigWriter instance
2246     - node: the node name
2247     - reason: string to use in the error message
2248     - requested: the amount of memory in MiB
2249
2250   """
2251   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2252   if not nodeinfo or not isinstance(nodeinfo, dict):
2253     raise errors.OpPrereqError("Could not contact node %s for resource"
2254                              " information" % (node,))
2255
2256   free_mem = nodeinfo[node].get('memory_free')
2257   if not isinstance(free_mem, int):
2258     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2259                              " was '%s'" % (node, free_mem))
2260   if requested > free_mem:
2261     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2262                              " needed %s MiB, available %s MiB" %
2263                              (node, reason, requested, free_mem))
2264
2265
2266 class LUStartupInstance(LogicalUnit):
2267   """Starts an instance.
2268
2269   """
2270   HPATH = "instance-start"
2271   HTYPE = constants.HTYPE_INSTANCE
2272   _OP_REQP = ["instance_name", "force"]
2273
2274   def BuildHooksEnv(self):
2275     """Build hooks env.
2276
2277     This runs on master, primary and secondary nodes of the instance.
2278
2279     """
2280     env = {
2281       "FORCE": self.op.force,
2282       }
2283     env.update(_BuildInstanceHookEnvByObject(self.instance))
2284     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2285           list(self.instance.secondary_nodes))
2286     return env, nl, nl
2287
2288   def CheckPrereq(self):
2289     """Check prerequisites.
2290
2291     This checks that the instance is in the cluster.
2292
2293     """
2294     instance = self.cfg.GetInstanceInfo(
2295       self.cfg.ExpandInstanceName(self.op.instance_name))
2296     if instance is None:
2297       raise errors.OpPrereqError("Instance '%s' not known" %
2298                                  self.op.instance_name)
2299
2300     # check bridges existance
2301     _CheckInstanceBridgesExist(instance)
2302
2303     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2304                          "starting instance %s" % instance.name,
2305                          instance.memory)
2306
2307     self.instance = instance
2308     self.op.instance_name = instance.name
2309
2310   def Exec(self, feedback_fn):
2311     """Start the instance.
2312
2313     """
2314     instance = self.instance
2315     force = self.op.force
2316     extra_args = getattr(self.op, "extra_args", "")
2317
2318     self.cfg.MarkInstanceUp(instance.name)
2319
2320     node_current = instance.primary_node
2321
2322     _StartInstanceDisks(self.cfg, instance, force)
2323
2324     if not rpc.call_instance_start(node_current, instance, extra_args):
2325       _ShutdownInstanceDisks(instance, self.cfg)
2326       raise errors.OpExecError("Could not start instance")
2327
2328
2329 class LURebootInstance(LogicalUnit):
2330   """Reboot an instance.
2331
2332   """
2333   HPATH = "instance-reboot"
2334   HTYPE = constants.HTYPE_INSTANCE
2335   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2336
2337   def BuildHooksEnv(self):
2338     """Build hooks env.
2339
2340     This runs on master, primary and secondary nodes of the instance.
2341
2342     """
2343     env = {
2344       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2345       }
2346     env.update(_BuildInstanceHookEnvByObject(self.instance))
2347     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2348           list(self.instance.secondary_nodes))
2349     return env, nl, nl
2350
2351   def CheckPrereq(self):
2352     """Check prerequisites.
2353
2354     This checks that the instance is in the cluster.
2355
2356     """
2357     instance = self.cfg.GetInstanceInfo(
2358       self.cfg.ExpandInstanceName(self.op.instance_name))
2359     if instance is None:
2360       raise errors.OpPrereqError("Instance '%s' not known" %
2361                                  self.op.instance_name)
2362
2363     # check bridges existance
2364     _CheckInstanceBridgesExist(instance)
2365
2366     self.instance = instance
2367     self.op.instance_name = instance.name
2368
2369   def Exec(self, feedback_fn):
2370     """Reboot the instance.
2371
2372     """
2373     instance = self.instance
2374     ignore_secondaries = self.op.ignore_secondaries
2375     reboot_type = self.op.reboot_type
2376     extra_args = getattr(self.op, "extra_args", "")
2377
2378     node_current = instance.primary_node
2379
2380     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2381                            constants.INSTANCE_REBOOT_HARD,
2382                            constants.INSTANCE_REBOOT_FULL]:
2383       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2384                                   (constants.INSTANCE_REBOOT_SOFT,
2385                                    constants.INSTANCE_REBOOT_HARD,
2386                                    constants.INSTANCE_REBOOT_FULL))
2387
2388     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2389                        constants.INSTANCE_REBOOT_HARD]:
2390       if not rpc.call_instance_reboot(node_current, instance,
2391                                       reboot_type, extra_args):
2392         raise errors.OpExecError("Could not reboot instance")
2393     else:
2394       if not rpc.call_instance_shutdown(node_current, instance):
2395         raise errors.OpExecError("could not shutdown instance for full reboot")
2396       _ShutdownInstanceDisks(instance, self.cfg)
2397       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2398       if not rpc.call_instance_start(node_current, instance, extra_args):
2399         _ShutdownInstanceDisks(instance, self.cfg)
2400         raise errors.OpExecError("Could not start instance for full reboot")
2401
2402     self.cfg.MarkInstanceUp(instance.name)
2403
2404
2405 class LUShutdownInstance(LogicalUnit):
2406   """Shutdown an instance.
2407
2408   """
2409   HPATH = "instance-stop"
2410   HTYPE = constants.HTYPE_INSTANCE
2411   _OP_REQP = ["instance_name"]
2412
2413   def BuildHooksEnv(self):
2414     """Build hooks env.
2415
2416     This runs on master, primary and secondary nodes of the instance.
2417
2418     """
2419     env = _BuildInstanceHookEnvByObject(self.instance)
2420     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2421           list(self.instance.secondary_nodes))
2422     return env, nl, nl
2423
2424   def CheckPrereq(self):
2425     """Check prerequisites.
2426
2427     This checks that the instance is in the cluster.
2428
2429     """
2430     instance = self.cfg.GetInstanceInfo(
2431       self.cfg.ExpandInstanceName(self.op.instance_name))
2432     if instance is None:
2433       raise errors.OpPrereqError("Instance '%s' not known" %
2434                                  self.op.instance_name)
2435     self.instance = instance
2436
2437   def Exec(self, feedback_fn):
2438     """Shutdown the instance.
2439
2440     """
2441     instance = self.instance
2442     node_current = instance.primary_node
2443     self.cfg.MarkInstanceDown(instance.name)
2444     if not rpc.call_instance_shutdown(node_current, instance):
2445       logger.Error("could not shutdown instance")
2446
2447     _ShutdownInstanceDisks(instance, self.cfg)
2448
2449
2450 class LUReinstallInstance(LogicalUnit):
2451   """Reinstall an instance.
2452
2453   """
2454   HPATH = "instance-reinstall"
2455   HTYPE = constants.HTYPE_INSTANCE
2456   _OP_REQP = ["instance_name"]
2457
2458   def BuildHooksEnv(self):
2459     """Build hooks env.
2460
2461     This runs on master, primary and secondary nodes of the instance.
2462
2463     """
2464     env = _BuildInstanceHookEnvByObject(self.instance)
2465     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2466           list(self.instance.secondary_nodes))
2467     return env, nl, nl
2468
2469   def CheckPrereq(self):
2470     """Check prerequisites.
2471
2472     This checks that the instance is in the cluster and is not running.
2473
2474     """
2475     instance = self.cfg.GetInstanceInfo(
2476       self.cfg.ExpandInstanceName(self.op.instance_name))
2477     if instance is None:
2478       raise errors.OpPrereqError("Instance '%s' not known" %
2479                                  self.op.instance_name)
2480     if instance.disk_template == constants.DT_DISKLESS:
2481       raise errors.OpPrereqError("Instance '%s' has no disks" %
2482                                  self.op.instance_name)
2483     if instance.status != "down":
2484       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2485                                  self.op.instance_name)
2486     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2487     if remote_info:
2488       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2489                                  (self.op.instance_name,
2490                                   instance.primary_node))
2491
2492     self.op.os_type = getattr(self.op, "os_type", None)
2493     if self.op.os_type is not None:
2494       # OS verification
2495       pnode = self.cfg.GetNodeInfo(
2496         self.cfg.ExpandNodeName(instance.primary_node))
2497       if pnode is None:
2498         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2499                                    self.op.pnode)
2500       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2501       if not os_obj:
2502         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2503                                    " primary node"  % self.op.os_type)
2504
2505     self.instance = instance
2506
2507   def Exec(self, feedback_fn):
2508     """Reinstall the instance.
2509
2510     """
2511     inst = self.instance
2512
2513     if self.op.os_type is not None:
2514       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2515       inst.os = self.op.os_type
2516       self.cfg.AddInstance(inst)
2517
2518     _StartInstanceDisks(self.cfg, inst, None)
2519     try:
2520       feedback_fn("Running the instance OS create scripts...")
2521       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2522         raise errors.OpExecError("Could not install OS for instance %s"
2523                                  " on node %s" %
2524                                  (inst.name, inst.primary_node))
2525     finally:
2526       _ShutdownInstanceDisks(inst, self.cfg)
2527
2528
2529 class LURenameInstance(LogicalUnit):
2530   """Rename an instance.
2531
2532   """
2533   HPATH = "instance-rename"
2534   HTYPE = constants.HTYPE_INSTANCE
2535   _OP_REQP = ["instance_name", "new_name"]
2536
2537   def BuildHooksEnv(self):
2538     """Build hooks env.
2539
2540     This runs on master, primary and secondary nodes of the instance.
2541
2542     """
2543     env = _BuildInstanceHookEnvByObject(self.instance)
2544     env["INSTANCE_NEW_NAME"] = self.op.new_name
2545     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2546           list(self.instance.secondary_nodes))
2547     return env, nl, nl
2548
2549   def CheckPrereq(self):
2550     """Check prerequisites.
2551
2552     This checks that the instance is in the cluster and is not running.
2553
2554     """
2555     instance = self.cfg.GetInstanceInfo(
2556       self.cfg.ExpandInstanceName(self.op.instance_name))
2557     if instance is None:
2558       raise errors.OpPrereqError("Instance '%s' not known" %
2559                                  self.op.instance_name)
2560     if instance.status != "down":
2561       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2562                                  self.op.instance_name)
2563     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2564     if remote_info:
2565       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2566                                  (self.op.instance_name,
2567                                   instance.primary_node))
2568     self.instance = instance
2569
2570     # new name verification
2571     name_info = utils.HostInfo(self.op.new_name)
2572
2573     self.op.new_name = new_name = name_info.name
2574     instance_list = self.cfg.GetInstanceList()
2575     if new_name in instance_list:
2576       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2577                                  new_name)
2578
2579     if not getattr(self.op, "ignore_ip", False):
2580       command = ["fping", "-q", name_info.ip]
2581       result = utils.RunCmd(command)
2582       if not result.failed:
2583         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2584                                    (name_info.ip, new_name))
2585
2586
2587   def Exec(self, feedback_fn):
2588     """Reinstall the instance.
2589
2590     """
2591     inst = self.instance
2592     old_name = inst.name
2593
2594     self.cfg.RenameInstance(inst.name, self.op.new_name)
2595
2596     # re-read the instance from the configuration after rename
2597     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2598
2599     _StartInstanceDisks(self.cfg, inst, None)
2600     try:
2601       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2602                                           "sda", "sdb"):
2603         msg = ("Could run OS rename script for instance %s on node %s (but the"
2604                " instance has been renamed in Ganeti)" %
2605                (inst.name, inst.primary_node))
2606         logger.Error(msg)
2607     finally:
2608       _ShutdownInstanceDisks(inst, self.cfg)
2609
2610
2611 class LURemoveInstance(LogicalUnit):
2612   """Remove an instance.
2613
2614   """
2615   HPATH = "instance-remove"
2616   HTYPE = constants.HTYPE_INSTANCE
2617   _OP_REQP = ["instance_name", "ignore_failures"]
2618
2619   def BuildHooksEnv(self):
2620     """Build hooks env.
2621
2622     This runs on master, primary and secondary nodes of the instance.
2623
2624     """
2625     env = _BuildInstanceHookEnvByObject(self.instance)
2626     nl = [self.sstore.GetMasterNode()]
2627     return env, nl, nl
2628
2629   def CheckPrereq(self):
2630     """Check prerequisites.
2631
2632     This checks that the instance is in the cluster.
2633
2634     """
2635     instance = self.cfg.GetInstanceInfo(
2636       self.cfg.ExpandInstanceName(self.op.instance_name))
2637     if instance is None:
2638       raise errors.OpPrereqError("Instance '%s' not known" %
2639                                  self.op.instance_name)
2640     self.instance = instance
2641
2642   def Exec(self, feedback_fn):
2643     """Remove the instance.
2644
2645     """
2646     instance = self.instance
2647     logger.Info("shutting down instance %s on node %s" %
2648                 (instance.name, instance.primary_node))
2649
2650     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2651       if self.op.ignore_failures:
2652         feedback_fn("Warning: can't shutdown instance")
2653       else:
2654         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2655                                  (instance.name, instance.primary_node))
2656
2657     logger.Info("removing block devices for instance %s" % instance.name)
2658
2659     if not _RemoveDisks(instance, self.cfg):
2660       if self.op.ignore_failures:
2661         feedback_fn("Warning: can't remove instance's disks")
2662       else:
2663         raise errors.OpExecError("Can't remove instance's disks")
2664
2665     logger.Info("removing instance %s out of cluster config" % instance.name)
2666
2667     self.cfg.RemoveInstance(instance.name)
2668
2669
2670 class LUQueryInstances(NoHooksLU):
2671   """Logical unit for querying instances.
2672
2673   """
2674   _OP_REQP = ["output_fields", "names"]
2675
2676   def CheckPrereq(self):
2677     """Check prerequisites.
2678
2679     This checks that the fields required are valid output fields.
2680
2681     """
2682     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2683     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2684                                "admin_state", "admin_ram",
2685                                "disk_template", "ip", "mac", "bridge",
2686                                "sda_size", "sdb_size", "vcpus"],
2687                        dynamic=self.dynamic_fields,
2688                        selected=self.op.output_fields)
2689
2690     self.wanted = _GetWantedInstances(self, self.op.names)
2691
2692   def Exec(self, feedback_fn):
2693     """Computes the list of nodes and their attributes.
2694
2695     """
2696     instance_names = self.wanted
2697     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2698                      in instance_names]
2699
2700     # begin data gathering
2701
2702     nodes = frozenset([inst.primary_node for inst in instance_list])
2703
2704     bad_nodes = []
2705     if self.dynamic_fields.intersection(self.op.output_fields):
2706       live_data = {}
2707       node_data = rpc.call_all_instances_info(nodes)
2708       for name in nodes:
2709         result = node_data[name]
2710         if result:
2711           live_data.update(result)
2712         elif result == False:
2713           bad_nodes.append(name)
2714         # else no instance is alive
2715     else:
2716       live_data = dict([(name, {}) for name in instance_names])
2717
2718     # end data gathering
2719
2720     output = []
2721     for instance in instance_list:
2722       iout = []
2723       for field in self.op.output_fields:
2724         if field == "name":
2725           val = instance.name
2726         elif field == "os":
2727           val = instance.os
2728         elif field == "pnode":
2729           val = instance.primary_node
2730         elif field == "snodes":
2731           val = list(instance.secondary_nodes)
2732         elif field == "admin_state":
2733           val = (instance.status != "down")
2734         elif field == "oper_state":
2735           if instance.primary_node in bad_nodes:
2736             val = None
2737           else:
2738             val = bool(live_data.get(instance.name))
2739         elif field == "status":
2740           if instance.primary_node in bad_nodes:
2741             val = "ERROR_nodedown"
2742           else:
2743             running = bool(live_data.get(instance.name))
2744             if running:
2745               if instance.status != "down":
2746                 val = "running"
2747               else:
2748                 val = "ERROR_up"
2749             else:
2750               if instance.status != "down":
2751                 val = "ERROR_down"
2752               else:
2753                 val = "ADMIN_down"
2754         elif field == "admin_ram":
2755           val = instance.memory
2756         elif field == "oper_ram":
2757           if instance.primary_node in bad_nodes:
2758             val = None
2759           elif instance.name in live_data:
2760             val = live_data[instance.name].get("memory", "?")
2761           else:
2762             val = "-"
2763         elif field == "disk_template":
2764           val = instance.disk_template
2765         elif field == "ip":
2766           val = instance.nics[0].ip
2767         elif field == "bridge":
2768           val = instance.nics[0].bridge
2769         elif field == "mac":
2770           val = instance.nics[0].mac
2771         elif field == "sda_size" or field == "sdb_size":
2772           disk = instance.FindDisk(field[:3])
2773           if disk is None:
2774             val = None
2775           else:
2776             val = disk.size
2777         elif field == "vcpus":
2778           val = instance.vcpus
2779         else:
2780           raise errors.ParameterError(field)
2781         iout.append(val)
2782       output.append(iout)
2783
2784     return output
2785
2786
2787 class LUFailoverInstance(LogicalUnit):
2788   """Failover an instance.
2789
2790   """
2791   HPATH = "instance-failover"
2792   HTYPE = constants.HTYPE_INSTANCE
2793   _OP_REQP = ["instance_name", "ignore_consistency"]
2794
2795   def BuildHooksEnv(self):
2796     """Build hooks env.
2797
2798     This runs on master, primary and secondary nodes of the instance.
2799
2800     """
2801     env = {
2802       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2803       }
2804     env.update(_BuildInstanceHookEnvByObject(self.instance))
2805     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2806     return env, nl, nl
2807
2808   def CheckPrereq(self):
2809     """Check prerequisites.
2810
2811     This checks that the instance is in the cluster.
2812
2813     """
2814     instance = self.cfg.GetInstanceInfo(
2815       self.cfg.ExpandInstanceName(self.op.instance_name))
2816     if instance is None:
2817       raise errors.OpPrereqError("Instance '%s' not known" %
2818                                  self.op.instance_name)
2819
2820     if instance.disk_template not in constants.DTS_NET_MIRROR:
2821       raise errors.OpPrereqError("Instance's disk layout is not"
2822                                  " network mirrored, cannot failover.")
2823
2824     secondary_nodes = instance.secondary_nodes
2825     if not secondary_nodes:
2826       raise errors.ProgrammerError("no secondary node but using "
2827                                    "DT_REMOTE_RAID1 template")
2828
2829     target_node = secondary_nodes[0]
2830     # check memory requirements on the secondary node
2831     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2832                          instance.name, instance.memory)
2833
2834     # check bridge existance
2835     brlist = [nic.bridge for nic in instance.nics]
2836     if not rpc.call_bridges_exist(target_node, brlist):
2837       raise errors.OpPrereqError("One or more target bridges %s does not"
2838                                  " exist on destination node '%s'" %
2839                                  (brlist, target_node))
2840
2841     self.instance = instance
2842
2843   def Exec(self, feedback_fn):
2844     """Failover an instance.
2845
2846     The failover is done by shutting it down on its present node and
2847     starting it on the secondary.
2848
2849     """
2850     instance = self.instance
2851
2852     source_node = instance.primary_node
2853     target_node = instance.secondary_nodes[0]
2854
2855     feedback_fn("* checking disk consistency between source and target")
2856     for dev in instance.disks:
2857       # for remote_raid1, these are md over drbd
2858       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2859         if instance.status == "up" and not self.op.ignore_consistency:
2860           raise errors.OpExecError("Disk %s is degraded on target node,"
2861                                    " aborting failover." % dev.iv_name)
2862
2863     feedback_fn("* shutting down instance on source node")
2864     logger.Info("Shutting down instance %s on node %s" %
2865                 (instance.name, source_node))
2866
2867     if not rpc.call_instance_shutdown(source_node, instance):
2868       if self.op.ignore_consistency:
2869         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2870                      " anyway. Please make sure node %s is down"  %
2871                      (instance.name, source_node, source_node))
2872       else:
2873         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2874                                  (instance.name, source_node))
2875
2876     feedback_fn("* deactivating the instance's disks on source node")
2877     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2878       raise errors.OpExecError("Can't shut down the instance's disks.")
2879
2880     instance.primary_node = target_node
2881     # distribute new instance config to the other nodes
2882     self.cfg.Update(instance)
2883
2884     # Only start the instance if it's marked as up
2885     if instance.status == "up":
2886       feedback_fn("* activating the instance's disks on target node")
2887       logger.Info("Starting instance %s on node %s" %
2888                   (instance.name, target_node))
2889
2890       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2891                                                ignore_secondaries=True)
2892       if not disks_ok:
2893         _ShutdownInstanceDisks(instance, self.cfg)
2894         raise errors.OpExecError("Can't activate the instance's disks")
2895
2896       feedback_fn("* starting the instance on the target node")
2897       if not rpc.call_instance_start(target_node, instance, None):
2898         _ShutdownInstanceDisks(instance, self.cfg)
2899         raise errors.OpExecError("Could not start instance %s on node %s." %
2900                                  (instance.name, target_node))
2901
2902
2903 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2904   """Create a tree of block devices on the primary node.
2905
2906   This always creates all devices.
2907
2908   """
2909   if device.children:
2910     for child in device.children:
2911       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2912         return False
2913
2914   cfg.SetDiskID(device, node)
2915   new_id = rpc.call_blockdev_create(node, device, device.size,
2916                                     instance.name, True, info)
2917   if not new_id:
2918     return False
2919   if device.physical_id is None:
2920     device.physical_id = new_id
2921   return True
2922
2923
2924 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2925   """Create a tree of block devices on a secondary node.
2926
2927   If this device type has to be created on secondaries, create it and
2928   all its children.
2929
2930   If not, just recurse to children keeping the same 'force' value.
2931
2932   """
2933   if device.CreateOnSecondary():
2934     force = True
2935   if device.children:
2936     for child in device.children:
2937       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2938                                         child, force, info):
2939         return False
2940
2941   if not force:
2942     return True
2943   cfg.SetDiskID(device, node)
2944   new_id = rpc.call_blockdev_create(node, device, device.size,
2945                                     instance.name, False, info)
2946   if not new_id:
2947     return False
2948   if device.physical_id is None:
2949     device.physical_id = new_id
2950   return True
2951
2952
2953 def _GenerateUniqueNames(cfg, exts):
2954   """Generate a suitable LV name.
2955
2956   This will generate a logical volume name for the given instance.
2957
2958   """
2959   results = []
2960   for val in exts:
2961     new_id = cfg.GenerateUniqueID()
2962     results.append("%s%s" % (new_id, val))
2963   return results
2964
2965
2966 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2967   """Generate a drbd device complete with its children.
2968
2969   """
2970   port = cfg.AllocatePort()
2971   vgname = cfg.GetVGName()
2972   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2973                           logical_id=(vgname, names[0]))
2974   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2975                           logical_id=(vgname, names[1]))
2976   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2977                           logical_id = (primary, secondary, port),
2978                           children = [dev_data, dev_meta])
2979   return drbd_dev
2980
2981
2982 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2983   """Generate a drbd8 device complete with its children.
2984
2985   """
2986   port = cfg.AllocatePort()
2987   vgname = cfg.GetVGName()
2988   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2989                           logical_id=(vgname, names[0]))
2990   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2991                           logical_id=(vgname, names[1]))
2992   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2993                           logical_id = (primary, secondary, port),
2994                           children = [dev_data, dev_meta],
2995                           iv_name=iv_name)
2996   return drbd_dev
2997
2998 def _GenerateDiskTemplate(cfg, template_name,
2999                           instance_name, primary_node,
3000                           secondary_nodes, disk_sz, swap_sz):
3001   """Generate the entire disk layout for a given template type.
3002
3003   """
3004   #TODO: compute space requirements
3005
3006   vgname = cfg.GetVGName()
3007   if template_name == constants.DT_DISKLESS:
3008     disks = []
3009   elif template_name == constants.DT_PLAIN:
3010     if len(secondary_nodes) != 0:
3011       raise errors.ProgrammerError("Wrong template configuration")
3012
3013     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3014     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3015                            logical_id=(vgname, names[0]),
3016                            iv_name = "sda")
3017     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3018                            logical_id=(vgname, names[1]),
3019                            iv_name = "sdb")
3020     disks = [sda_dev, sdb_dev]
3021   elif template_name == constants.DT_LOCAL_RAID1:
3022     if len(secondary_nodes) != 0:
3023       raise errors.ProgrammerError("Wrong template configuration")
3024
3025
3026     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3027                                        ".sdb_m1", ".sdb_m2"])
3028     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3029                               logical_id=(vgname, names[0]))
3030     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3031                               logical_id=(vgname, names[1]))
3032     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3033                               size=disk_sz,
3034                               children = [sda_dev_m1, sda_dev_m2])
3035     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3036                               logical_id=(vgname, names[2]))
3037     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3038                               logical_id=(vgname, names[3]))
3039     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3040                               size=swap_sz,
3041                               children = [sdb_dev_m1, sdb_dev_m2])
3042     disks = [md_sda_dev, md_sdb_dev]
3043   elif template_name == constants.DT_REMOTE_RAID1:
3044     if len(secondary_nodes) != 1:
3045       raise errors.ProgrammerError("Wrong template configuration")
3046     remote_node = secondary_nodes[0]
3047     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3048                                        ".sdb_data", ".sdb_meta"])
3049     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3050                                          disk_sz, names[0:2])
3051     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3052                               children = [drbd_sda_dev], size=disk_sz)
3053     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3054                                          swap_sz, names[2:4])
3055     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3056                               children = [drbd_sdb_dev], size=swap_sz)
3057     disks = [md_sda_dev, md_sdb_dev]
3058   elif template_name == constants.DT_DRBD8:
3059     if len(secondary_nodes) != 1:
3060       raise errors.ProgrammerError("Wrong template configuration")
3061     remote_node = secondary_nodes[0]
3062     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3063                                        ".sdb_data", ".sdb_meta"])
3064     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3065                                          disk_sz, names[0:2], "sda")
3066     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3067                                          swap_sz, names[2:4], "sdb")
3068     disks = [drbd_sda_dev, drbd_sdb_dev]
3069   else:
3070     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3071   return disks
3072
3073
3074 def _GetInstanceInfoText(instance):
3075   """Compute that text that should be added to the disk's metadata.
3076
3077   """
3078   return "originstname+%s" % instance.name
3079
3080
3081 def _CreateDisks(cfg, instance):
3082   """Create all disks for an instance.
3083
3084   This abstracts away some work from AddInstance.
3085
3086   Args:
3087     instance: the instance object
3088
3089   Returns:
3090     True or False showing the success of the creation process
3091
3092   """
3093   info = _GetInstanceInfoText(instance)
3094
3095   for device in instance.disks:
3096     logger.Info("creating volume %s for instance %s" %
3097               (device.iv_name, instance.name))
3098     #HARDCODE
3099     for secondary_node in instance.secondary_nodes:
3100       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3101                                         device, False, info):
3102         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3103                      (device.iv_name, device, secondary_node))
3104         return False
3105     #HARDCODE
3106     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3107                                     instance, device, info):
3108       logger.Error("failed to create volume %s on primary!" %
3109                    device.iv_name)
3110       return False
3111   return True
3112
3113
3114 def _RemoveDisks(instance, cfg):
3115   """Remove all disks for an instance.
3116
3117   This abstracts away some work from `AddInstance()` and
3118   `RemoveInstance()`. Note that in case some of the devices couldn't
3119   be removed, the removal will continue with the other ones (compare
3120   with `_CreateDisks()`).
3121
3122   Args:
3123     instance: the instance object
3124
3125   Returns:
3126     True or False showing the success of the removal proces
3127
3128   """
3129   logger.Info("removing block devices for instance %s" % instance.name)
3130
3131   result = True
3132   for device in instance.disks:
3133     for node, disk in device.ComputeNodeTree(instance.primary_node):
3134       cfg.SetDiskID(disk, node)
3135       if not rpc.call_blockdev_remove(node, disk):
3136         logger.Error("could not remove block device %s on node %s,"
3137                      " continuing anyway" %
3138                      (device.iv_name, node))
3139         result = False
3140   return result
3141
3142
3143 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3144   """Compute disk size requirements in the volume group
3145
3146   This is currently hard-coded for the two-drive layout.
3147
3148   """
3149   # Required free disk space as a function of disk and swap space
3150   req_size_dict = {
3151     constants.DT_DISKLESS: None,
3152     constants.DT_PLAIN: disk_size + swap_size,
3153     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3154     # 256 MB are added for drbd metadata, 128MB for each drbd device
3155     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3156     constants.DT_DRBD8: disk_size + swap_size + 256,
3157   }
3158
3159   if disk_template not in req_size_dict:
3160     raise errors.ProgrammerError("Disk template '%s' size requirement"
3161                                  " is unknown" %  disk_template)
3162
3163   return req_size_dict[disk_template]
3164
3165
3166 class LUCreateInstance(LogicalUnit):
3167   """Create an instance.
3168
3169   """
3170   HPATH = "instance-add"
3171   HTYPE = constants.HTYPE_INSTANCE
3172   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3173               "disk_template", "swap_size", "mode", "start", "vcpus",
3174               "wait_for_sync", "ip_check", "mac"]
3175
3176   def _RunAllocator(self):
3177     """Run the allocator based on input opcode.
3178
3179     """
3180     disks = [{"size": self.op.disk_size, "mode": "w"},
3181              {"size": self.op.swap_size, "mode": "w"}]
3182     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3183              "bridge": self.op.bridge}]
3184     ial = IAllocator(self.cfg, self.sstore,
3185                      mode=constants.IALLOCATOR_MODE_ALLOC,
3186                      name=self.op.instance_name,
3187                      disk_template=self.op.disk_template,
3188                      tags=[],
3189                      os=self.op.os_type,
3190                      vcpus=self.op.vcpus,
3191                      mem_size=self.op.mem_size,
3192                      disks=disks,
3193                      nics=nics,
3194                      )
3195
3196     ial.Run(self.op.iallocator)
3197
3198     if not ial.success:
3199       raise errors.OpPrereqError("Can't compute nodes using"
3200                                  " iallocator '%s': %s" % (self.op.iallocator,
3201                                                            ial.info))
3202     if len(ial.nodes) != ial.required_nodes:
3203       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3204                                  " of nodes (%s), required %s" %
3205                                  (len(ial.nodes), ial.required_nodes))
3206     self.op.pnode = ial.nodes[0]
3207     logger.ToStdout("Selected nodes for the instance: %s" %
3208                     (", ".join(ial.nodes),))
3209     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3210                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3211     if ial.required_nodes == 2:
3212       self.op.snode = ial.nodes[1]
3213
3214   def BuildHooksEnv(self):
3215     """Build hooks env.
3216
3217     This runs on master, primary and secondary nodes of the instance.
3218
3219     """
3220     env = {
3221       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3222       "INSTANCE_DISK_SIZE": self.op.disk_size,
3223       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3224       "INSTANCE_ADD_MODE": self.op.mode,
3225       }
3226     if self.op.mode == constants.INSTANCE_IMPORT:
3227       env["INSTANCE_SRC_NODE"] = self.op.src_node
3228       env["INSTANCE_SRC_PATH"] = self.op.src_path
3229       env["INSTANCE_SRC_IMAGE"] = self.src_image
3230
3231     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3232       primary_node=self.op.pnode,
3233       secondary_nodes=self.secondaries,
3234       status=self.instance_status,
3235       os_type=self.op.os_type,
3236       memory=self.op.mem_size,
3237       vcpus=self.op.vcpus,
3238       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3239     ))
3240
3241     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3242           self.secondaries)
3243     return env, nl, nl
3244
3245
3246   def CheckPrereq(self):
3247     """Check prerequisites.
3248
3249     """
3250     # set optional parameters to none if they don't exist
3251     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3252                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3253                  "vnc_bind_address"]:
3254       if not hasattr(self.op, attr):
3255         setattr(self.op, attr, None)
3256
3257     if self.op.mode not in (constants.INSTANCE_CREATE,
3258                             constants.INSTANCE_IMPORT):
3259       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3260                                  self.op.mode)
3261
3262     if self.op.mode == constants.INSTANCE_IMPORT:
3263       src_node = getattr(self.op, "src_node", None)
3264       src_path = getattr(self.op, "src_path", None)
3265       if src_node is None or src_path is None:
3266         raise errors.OpPrereqError("Importing an instance requires source"
3267                                    " node and path options")
3268       src_node_full = self.cfg.ExpandNodeName(src_node)
3269       if src_node_full is None:
3270         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3271       self.op.src_node = src_node = src_node_full
3272
3273       if not os.path.isabs(src_path):
3274         raise errors.OpPrereqError("The source path must be absolute")
3275
3276       export_info = rpc.call_export_info(src_node, src_path)
3277
3278       if not export_info:
3279         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3280
3281       if not export_info.has_section(constants.INISECT_EXP):
3282         raise errors.ProgrammerError("Corrupted export config")
3283
3284       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3285       if (int(ei_version) != constants.EXPORT_VERSION):
3286         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3287                                    (ei_version, constants.EXPORT_VERSION))
3288
3289       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3290         raise errors.OpPrereqError("Can't import instance with more than"
3291                                    " one data disk")
3292
3293       # FIXME: are the old os-es, disk sizes, etc. useful?
3294       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3295       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3296                                                          'disk0_dump'))
3297       self.src_image = diskimage
3298     else: # INSTANCE_CREATE
3299       if getattr(self.op, "os_type", None) is None:
3300         raise errors.OpPrereqError("No guest OS specified")
3301
3302     #### instance parameters check
3303
3304     # disk template and mirror node verification
3305     if self.op.disk_template not in constants.DISK_TEMPLATES:
3306       raise errors.OpPrereqError("Invalid disk template name")
3307
3308     # instance name verification
3309     hostname1 = utils.HostInfo(self.op.instance_name)
3310
3311     self.op.instance_name = instance_name = hostname1.name
3312     instance_list = self.cfg.GetInstanceList()
3313     if instance_name in instance_list:
3314       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3315                                  instance_name)
3316
3317     # ip validity checks
3318     ip = getattr(self.op, "ip", None)
3319     if ip is None or ip.lower() == "none":
3320       inst_ip = None
3321     elif ip.lower() == "auto":
3322       inst_ip = hostname1.ip
3323     else:
3324       if not utils.IsValidIP(ip):
3325         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3326                                    " like a valid IP" % ip)
3327       inst_ip = ip
3328     self.inst_ip = self.op.ip = inst_ip
3329
3330     if self.op.start and not self.op.ip_check:
3331       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3332                                  " adding an instance in start mode")
3333
3334     if self.op.ip_check:
3335       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3336         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3337                                    (hostname1.ip, instance_name))
3338
3339     # MAC address verification
3340     if self.op.mac != "auto":
3341       if not utils.IsValidMac(self.op.mac.lower()):
3342         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3343                                    self.op.mac)
3344
3345     # bridge verification
3346     bridge = getattr(self.op, "bridge", None)
3347     if bridge is None:
3348       self.op.bridge = self.cfg.GetDefBridge()
3349     else:
3350       self.op.bridge = bridge
3351
3352     # boot order verification
3353     if self.op.hvm_boot_order is not None:
3354       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3355         raise errors.OpPrereqError("invalid boot order specified,"
3356                                    " must be one or more of [acdn]")
3357     #### allocator run
3358
3359     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3360       raise errors.OpPrereqError("One and only one of iallocator and primary"
3361                                  " node must be given")
3362
3363     if self.op.iallocator is not None:
3364       self._RunAllocator()
3365
3366     #### node related checks
3367
3368     # check primary node
3369     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3370     if pnode is None:
3371       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3372                                  self.op.pnode)
3373     self.op.pnode = pnode.name
3374     self.pnode = pnode
3375     self.secondaries = []
3376
3377     # mirror node verification
3378     if self.op.disk_template in constants.DTS_NET_MIRROR:
3379       if getattr(self.op, "snode", None) is None:
3380         raise errors.OpPrereqError("The networked disk templates need"
3381                                    " a mirror node")
3382
3383       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3384       if snode_name is None:
3385         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3386                                    self.op.snode)
3387       elif snode_name == pnode.name:
3388         raise errors.OpPrereqError("The secondary node cannot be"
3389                                    " the primary node.")
3390       self.secondaries.append(snode_name)
3391
3392     req_size = _ComputeDiskSize(self.op.disk_template,
3393                                 self.op.disk_size, self.op.swap_size)
3394
3395     # Check lv size requirements
3396     if req_size is not None:
3397       nodenames = [pnode.name] + self.secondaries
3398       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3399       for node in nodenames:
3400         info = nodeinfo.get(node, None)
3401         if not info:
3402           raise errors.OpPrereqError("Cannot get current information"
3403                                      " from node '%s'" % node)
3404         vg_free = info.get('vg_free', None)
3405         if not isinstance(vg_free, int):
3406           raise errors.OpPrereqError("Can't compute free disk space on"
3407                                      " node %s" % node)
3408         if req_size > info['vg_free']:
3409           raise errors.OpPrereqError("Not enough disk space on target node %s."
3410                                      " %d MB available, %d MB required" %
3411                                      (node, info['vg_free'], req_size))
3412
3413     # os verification
3414     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3415     if not os_obj:
3416       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3417                                  " primary node"  % self.op.os_type)
3418
3419     if self.op.kernel_path == constants.VALUE_NONE:
3420       raise errors.OpPrereqError("Can't set instance kernel to none")
3421
3422
3423     # bridge check on primary node
3424     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3425       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3426                                  " destination node '%s'" %
3427                                  (self.op.bridge, pnode.name))
3428
3429     # memory check on primary node
3430     if self.op.start:
3431       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3432                            "creating instance %s" % self.op.instance_name,
3433                            self.op.mem_size)
3434
3435     # hvm_cdrom_image_path verification
3436     if self.op.hvm_cdrom_image_path is not None:
3437       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3438         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3439                                    " be an absolute path or None, not %s" %
3440                                    self.op.hvm_cdrom_image_path)
3441       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3442         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3443                                    " regular file or a symlink pointing to"
3444                                    " an existing regular file, not %s" %
3445                                    self.op.hvm_cdrom_image_path)
3446
3447     # vnc_bind_address verification
3448     if self.op.vnc_bind_address is not None:
3449       if not utils.IsValidIP(self.op.vnc_bind_address):
3450         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3451                                    " like a valid IP address" %
3452                                    self.op.vnc_bind_address)
3453
3454     if self.op.start:
3455       self.instance_status = 'up'
3456     else:
3457       self.instance_status = 'down'
3458
3459   def Exec(self, feedback_fn):
3460     """Create and add the instance to the cluster.
3461
3462     """
3463     instance = self.op.instance_name
3464     pnode_name = self.pnode.name
3465
3466     if self.op.mac == "auto":
3467       mac_address = self.cfg.GenerateMAC()
3468     else:
3469       mac_address = self.op.mac
3470
3471     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3472     if self.inst_ip is not None:
3473       nic.ip = self.inst_ip
3474
3475     ht_kind = self.sstore.GetHypervisorType()
3476     if ht_kind in constants.HTS_REQ_PORT:
3477       network_port = self.cfg.AllocatePort()
3478     else:
3479       network_port = None
3480
3481     if self.op.vnc_bind_address is None:
3482       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3483
3484     disks = _GenerateDiskTemplate(self.cfg,
3485                                   self.op.disk_template,
3486                                   instance, pnode_name,
3487                                   self.secondaries, self.op.disk_size,
3488                                   self.op.swap_size)
3489
3490     iobj = objects.Instance(name=instance, os=self.op.os_type,
3491                             primary_node=pnode_name,
3492                             memory=self.op.mem_size,
3493                             vcpus=self.op.vcpus,
3494                             nics=[nic], disks=disks,
3495                             disk_template=self.op.disk_template,
3496                             status=self.instance_status,
3497                             network_port=network_port,
3498                             kernel_path=self.op.kernel_path,
3499                             initrd_path=self.op.initrd_path,
3500                             hvm_boot_order=self.op.hvm_boot_order,
3501                             hvm_acpi=self.op.hvm_acpi,
3502                             hvm_pae=self.op.hvm_pae,
3503                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3504                             vnc_bind_address=self.op.vnc_bind_address,
3505                             )
3506
3507     feedback_fn("* creating instance disks...")
3508     if not _CreateDisks(self.cfg, iobj):
3509       _RemoveDisks(iobj, self.cfg)
3510       raise errors.OpExecError("Device creation failed, reverting...")
3511
3512     feedback_fn("adding instance %s to cluster config" % instance)
3513
3514     self.cfg.AddInstance(iobj)
3515
3516     if self.op.wait_for_sync:
3517       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3518     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3519       # make sure the disks are not degraded (still sync-ing is ok)
3520       time.sleep(15)
3521       feedback_fn("* checking mirrors status")
3522       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3523     else:
3524       disk_abort = False
3525
3526     if disk_abort:
3527       _RemoveDisks(iobj, self.cfg)
3528       self.cfg.RemoveInstance(iobj.name)
3529       raise errors.OpExecError("There are some degraded disks for"
3530                                " this instance")
3531
3532     feedback_fn("creating os for instance %s on node %s" %
3533                 (instance, pnode_name))
3534
3535     if iobj.disk_template != constants.DT_DISKLESS:
3536       if self.op.mode == constants.INSTANCE_CREATE:
3537         feedback_fn("* running the instance OS create scripts...")
3538         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3539           raise errors.OpExecError("could not add os for instance %s"
3540                                    " on node %s" %
3541                                    (instance, pnode_name))
3542
3543       elif self.op.mode == constants.INSTANCE_IMPORT:
3544         feedback_fn("* running the instance OS import scripts...")
3545         src_node = self.op.src_node
3546         src_image = self.src_image
3547         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3548                                                 src_node, src_image):
3549           raise errors.OpExecError("Could not import os for instance"
3550                                    " %s on node %s" %
3551                                    (instance, pnode_name))
3552       else:
3553         # also checked in the prereq part
3554         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3555                                      % self.op.mode)
3556
3557     if self.op.start:
3558       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3559       feedback_fn("* starting instance...")
3560       if not rpc.call_instance_start(pnode_name, iobj, None):
3561         raise errors.OpExecError("Could not start instance")
3562
3563
3564 class LUConnectConsole(NoHooksLU):
3565   """Connect to an instance's console.
3566
3567   This is somewhat special in that it returns the command line that
3568   you need to run on the master node in order to connect to the
3569   console.
3570
3571   """
3572   _OP_REQP = ["instance_name"]
3573
3574   def CheckPrereq(self):
3575     """Check prerequisites.
3576
3577     This checks that the instance is in the cluster.
3578
3579     """
3580     instance = self.cfg.GetInstanceInfo(
3581       self.cfg.ExpandInstanceName(self.op.instance_name))
3582     if instance is None:
3583       raise errors.OpPrereqError("Instance '%s' not known" %
3584                                  self.op.instance_name)
3585     self.instance = instance
3586
3587   def Exec(self, feedback_fn):
3588     """Connect to the console of an instance
3589
3590     """
3591     instance = self.instance
3592     node = instance.primary_node
3593
3594     node_insts = rpc.call_instance_list([node])[node]
3595     if node_insts is False:
3596       raise errors.OpExecError("Can't connect to node %s." % node)
3597
3598     if instance.name not in node_insts:
3599       raise errors.OpExecError("Instance %s is not running." % instance.name)
3600
3601     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3602
3603     hyper = hypervisor.GetHypervisor()
3604     console_cmd = hyper.GetShellCommandForConsole(instance)
3605     # build ssh cmdline
3606     argv = ["ssh", "-q", "-t"]
3607     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3608     argv.extend(ssh.BATCH_MODE_OPTS)
3609     argv.append(node)
3610     argv.append(console_cmd)
3611     return "ssh", argv
3612
3613
3614 class LUAddMDDRBDComponent(LogicalUnit):
3615   """Adda new mirror member to an instance's disk.
3616
3617   """
3618   HPATH = "mirror-add"
3619   HTYPE = constants.HTYPE_INSTANCE
3620   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3621
3622   def BuildHooksEnv(self):
3623     """Build hooks env.
3624
3625     This runs on the master, the primary and all the secondaries.
3626
3627     """
3628     env = {
3629       "NEW_SECONDARY": self.op.remote_node,
3630       "DISK_NAME": self.op.disk_name,
3631       }
3632     env.update(_BuildInstanceHookEnvByObject(self.instance))
3633     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3634           self.op.remote_node,] + list(self.instance.secondary_nodes)
3635     return env, nl, nl
3636
3637   def CheckPrereq(self):
3638     """Check prerequisites.
3639
3640     This checks that the instance is in the cluster.
3641
3642     """
3643     instance = self.cfg.GetInstanceInfo(
3644       self.cfg.ExpandInstanceName(self.op.instance_name))
3645     if instance is None:
3646       raise errors.OpPrereqError("Instance '%s' not known" %
3647                                  self.op.instance_name)
3648     self.instance = instance
3649
3650     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3651     if remote_node is None:
3652       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3653     self.remote_node = remote_node
3654
3655     if remote_node == instance.primary_node:
3656       raise errors.OpPrereqError("The specified node is the primary node of"
3657                                  " the instance.")
3658
3659     if instance.disk_template != constants.DT_REMOTE_RAID1:
3660       raise errors.OpPrereqError("Instance's disk layout is not"
3661                                  " remote_raid1.")
3662     for disk in instance.disks:
3663       if disk.iv_name == self.op.disk_name:
3664         break
3665     else:
3666       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3667                                  " instance." % self.op.disk_name)
3668     if len(disk.children) > 1:
3669       raise errors.OpPrereqError("The device already has two slave devices."
3670                                  " This would create a 3-disk raid1 which we"
3671                                  " don't allow.")
3672     self.disk = disk
3673
3674   def Exec(self, feedback_fn):
3675     """Add the mirror component
3676
3677     """
3678     disk = self.disk
3679     instance = self.instance
3680
3681     remote_node = self.remote_node
3682     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3683     names = _GenerateUniqueNames(self.cfg, lv_names)
3684     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3685                                      remote_node, disk.size, names)
3686
3687     logger.Info("adding new mirror component on secondary")
3688     #HARDCODE
3689     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3690                                       new_drbd, False,
3691                                       _GetInstanceInfoText(instance)):
3692       raise errors.OpExecError("Failed to create new component on secondary"
3693                                " node %s" % remote_node)
3694
3695     logger.Info("adding new mirror component on primary")
3696     #HARDCODE
3697     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3698                                     instance, new_drbd,
3699                                     _GetInstanceInfoText(instance)):
3700       # remove secondary dev
3701       self.cfg.SetDiskID(new_drbd, remote_node)
3702       rpc.call_blockdev_remove(remote_node, new_drbd)
3703       raise errors.OpExecError("Failed to create volume on primary")
3704
3705     # the device exists now
3706     # call the primary node to add the mirror to md
3707     logger.Info("adding new mirror component to md")
3708     if not rpc.call_blockdev_addchildren(instance.primary_node,
3709                                          disk, [new_drbd]):
3710       logger.Error("Can't add mirror compoment to md!")
3711       self.cfg.SetDiskID(new_drbd, remote_node)
3712       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3713         logger.Error("Can't rollback on secondary")
3714       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3715       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3716         logger.Error("Can't rollback on primary")
3717       raise errors.OpExecError("Can't add mirror component to md array")
3718
3719     disk.children.append(new_drbd)
3720
3721     self.cfg.AddInstance(instance)
3722
3723     _WaitForSync(self.cfg, instance, self.proc)
3724
3725     return 0
3726
3727
3728 class LURemoveMDDRBDComponent(LogicalUnit):
3729   """Remove a component from a remote_raid1 disk.
3730
3731   """
3732   HPATH = "mirror-remove"
3733   HTYPE = constants.HTYPE_INSTANCE
3734   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3735
3736   def BuildHooksEnv(self):
3737     """Build hooks env.
3738
3739     This runs on the master, the primary and all the secondaries.
3740
3741     """
3742     env = {
3743       "DISK_NAME": self.op.disk_name,
3744       "DISK_ID": self.op.disk_id,
3745       "OLD_SECONDARY": self.old_secondary,
3746       }
3747     env.update(_BuildInstanceHookEnvByObject(self.instance))
3748     nl = [self.sstore.GetMasterNode(),
3749           self.instance.primary_node] + list(self.instance.secondary_nodes)
3750     return env, nl, nl
3751
3752   def CheckPrereq(self):
3753     """Check prerequisites.
3754
3755     This checks that the instance is in the cluster.
3756
3757     """
3758     instance = self.cfg.GetInstanceInfo(
3759       self.cfg.ExpandInstanceName(self.op.instance_name))
3760     if instance is None:
3761       raise errors.OpPrereqError("Instance '%s' not known" %
3762                                  self.op.instance_name)
3763     self.instance = instance
3764
3765     if instance.disk_template != constants.DT_REMOTE_RAID1:
3766       raise errors.OpPrereqError("Instance's disk layout is not"
3767                                  " remote_raid1.")
3768     for disk in instance.disks:
3769       if disk.iv_name == self.op.disk_name:
3770         break
3771     else:
3772       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3773                                  " instance." % self.op.disk_name)
3774     for child in disk.children:
3775       if (child.dev_type == constants.LD_DRBD7 and
3776           child.logical_id[2] == self.op.disk_id):
3777         break
3778     else:
3779       raise errors.OpPrereqError("Can't find the device with this port.")
3780
3781     if len(disk.children) < 2:
3782       raise errors.OpPrereqError("Cannot remove the last component from"
3783                                  " a mirror.")
3784     self.disk = disk
3785     self.child = child
3786     if self.child.logical_id[0] == instance.primary_node:
3787       oid = 1
3788     else:
3789       oid = 0
3790     self.old_secondary = self.child.logical_id[oid]
3791
3792   def Exec(self, feedback_fn):
3793     """Remove the mirror component
3794
3795     """
3796     instance = self.instance
3797     disk = self.disk
3798     child = self.child
3799     logger.Info("remove mirror component")
3800     self.cfg.SetDiskID(disk, instance.primary_node)
3801     if not rpc.call_blockdev_removechildren(instance.primary_node,
3802                                             disk, [child]):
3803       raise errors.OpExecError("Can't remove child from mirror.")
3804
3805     for node in child.logical_id[:2]:
3806       self.cfg.SetDiskID(child, node)
3807       if not rpc.call_blockdev_remove(node, child):
3808         logger.Error("Warning: failed to remove device from node %s,"
3809                      " continuing operation." % node)
3810
3811     disk.children.remove(child)
3812     self.cfg.AddInstance(instance)
3813
3814
3815 class LUReplaceDisks(LogicalUnit):
3816   """Replace the disks of an instance.
3817
3818   """
3819   HPATH = "mirrors-replace"
3820   HTYPE = constants.HTYPE_INSTANCE
3821   _OP_REQP = ["instance_name", "mode", "disks"]
3822
3823   def _RunAllocator(self):
3824     """Compute a new secondary node using an IAllocator.
3825
3826     """
3827     ial = IAllocator(self.cfg, self.sstore,
3828                      mode=constants.IALLOCATOR_MODE_RELOC,
3829                      name=self.op.instance_name,
3830                      relocate_from=[self.sec_node])
3831
3832     ial.Run(self.op.iallocator)
3833
3834     if not ial.success:
3835       raise errors.OpPrereqError("Can't compute nodes using"
3836                                  " iallocator '%s': %s" % (self.op.iallocator,
3837                                                            ial.info))
3838     if len(ial.nodes) != ial.required_nodes:
3839       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3840                                  " of nodes (%s), required %s" %
3841                                  (len(ial.nodes), ial.required_nodes))
3842     self.op.remote_node = ial.nodes[0]
3843     logger.ToStdout("Selected new secondary for the instance: %s" %
3844                     self.op.remote_node)
3845
3846   def BuildHooksEnv(self):
3847     """Build hooks env.
3848
3849     This runs on the master, the primary and all the secondaries.
3850
3851     """
3852     env = {
3853       "MODE": self.op.mode,
3854       "NEW_SECONDARY": self.op.remote_node,
3855       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3856       }
3857     env.update(_BuildInstanceHookEnvByObject(self.instance))
3858     nl = [
3859       self.sstore.GetMasterNode(),
3860       self.instance.primary_node,
3861       ]
3862     if self.op.remote_node is not None:
3863       nl.append(self.op.remote_node)
3864     return env, nl, nl
3865
3866   def CheckPrereq(self):
3867     """Check prerequisites.
3868
3869     This checks that the instance is in the cluster.
3870
3871     """
3872     if not hasattr(self.op, "remote_node"):
3873       self.op.remote_node = None
3874
3875     instance = self.cfg.GetInstanceInfo(
3876       self.cfg.ExpandInstanceName(self.op.instance_name))
3877     if instance is None:
3878       raise errors.OpPrereqError("Instance '%s' not known" %
3879                                  self.op.instance_name)
3880     self.instance = instance
3881     self.op.instance_name = instance.name
3882
3883     if instance.disk_template not in constants.DTS_NET_MIRROR:
3884       raise errors.OpPrereqError("Instance's disk layout is not"
3885                                  " network mirrored.")
3886
3887     if len(instance.secondary_nodes) != 1:
3888       raise errors.OpPrereqError("The instance has a strange layout,"
3889                                  " expected one secondary but found %d" %
3890                                  len(instance.secondary_nodes))
3891
3892     self.sec_node = instance.secondary_nodes[0]
3893
3894     ia_name = getattr(self.op, "iallocator", None)
3895     if ia_name is not None:
3896       if self.op.remote_node is not None:
3897         raise errors.OpPrereqError("Give either the iallocator or the new"
3898                                    " secondary, not both")
3899       self.op.remote_node = self._RunAllocator()
3900
3901     remote_node = self.op.remote_node
3902     if remote_node is not None:
3903       remote_node = self.cfg.ExpandNodeName(remote_node)
3904       if remote_node is None:
3905         raise errors.OpPrereqError("Node '%s' not known" %
3906                                    self.op.remote_node)
3907       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3908     else:
3909       self.remote_node_info = None
3910     if remote_node == instance.primary_node:
3911       raise errors.OpPrereqError("The specified node is the primary node of"
3912                                  " the instance.")
3913     elif remote_node == self.sec_node:
3914       if self.op.mode == constants.REPLACE_DISK_SEC:
3915         # this is for DRBD8, where we can't execute the same mode of
3916         # replacement as for drbd7 (no different port allocated)
3917         raise errors.OpPrereqError("Same secondary given, cannot execute"
3918                                    " replacement")
3919       # the user gave the current secondary, switch to
3920       # 'no-replace-secondary' mode for drbd7
3921       remote_node = None
3922     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3923         self.op.mode != constants.REPLACE_DISK_ALL):
3924       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3925                                  " disks replacement, not individual ones")
3926     if instance.disk_template == constants.DT_DRBD8:
3927       if (self.op.mode == constants.REPLACE_DISK_ALL and
3928           remote_node is not None):
3929         # switch to replace secondary mode
3930         self.op.mode = constants.REPLACE_DISK_SEC
3931
3932       if self.op.mode == constants.REPLACE_DISK_ALL:
3933         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3934                                    " secondary disk replacement, not"
3935                                    " both at once")
3936       elif self.op.mode == constants.REPLACE_DISK_PRI:
3937         if remote_node is not None:
3938           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3939                                      " the secondary while doing a primary"
3940                                      " node disk replacement")
3941         self.tgt_node = instance.primary_node
3942         self.oth_node = instance.secondary_nodes[0]
3943       elif self.op.mode == constants.REPLACE_DISK_SEC:
3944         self.new_node = remote_node # this can be None, in which case
3945                                     # we don't change the secondary
3946         self.tgt_node = instance.secondary_nodes[0]
3947         self.oth_node = instance.primary_node
3948       else:
3949         raise errors.ProgrammerError("Unhandled disk replace mode")
3950
3951     for name in self.op.disks:
3952       if instance.FindDisk(name) is None:
3953         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3954                                    (name, instance.name))
3955     self.op.remote_node = remote_node
3956
3957   def _ExecRR1(self, feedback_fn):
3958     """Replace the disks of an instance.
3959
3960     """
3961     instance = self.instance
3962     iv_names = {}
3963     # start of work
3964     if self.op.remote_node is None:
3965       remote_node = self.sec_node
3966     else:
3967       remote_node = self.op.remote_node
3968     cfg = self.cfg
3969     for dev in instance.disks:
3970       size = dev.size
3971       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3972       names = _GenerateUniqueNames(cfg, lv_names)
3973       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3974                                        remote_node, size, names)
3975       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3976       logger.Info("adding new mirror component on secondary for %s" %
3977                   dev.iv_name)
3978       #HARDCODE
3979       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3980                                         new_drbd, False,
3981                                         _GetInstanceInfoText(instance)):
3982         raise errors.OpExecError("Failed to create new component on secondary"
3983                                  " node %s. Full abort, cleanup manually!" %
3984                                  remote_node)
3985
3986       logger.Info("adding new mirror component on primary")
3987       #HARDCODE
3988       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3989                                       instance, new_drbd,
3990                                       _GetInstanceInfoText(instance)):
3991         # remove secondary dev
3992         cfg.SetDiskID(new_drbd, remote_node)
3993         rpc.call_blockdev_remove(remote_node, new_drbd)
3994         raise errors.OpExecError("Failed to create volume on primary!"
3995                                  " Full abort, cleanup manually!!")
3996
3997       # the device exists now
3998       # call the primary node to add the mirror to md
3999       logger.Info("adding new mirror component to md")
4000       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4001                                            [new_drbd]):
4002         logger.Error("Can't add mirror compoment to md!")
4003         cfg.SetDiskID(new_drbd, remote_node)
4004         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4005           logger.Error("Can't rollback on secondary")
4006         cfg.SetDiskID(new_drbd, instance.primary_node)
4007         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4008           logger.Error("Can't rollback on primary")
4009         raise errors.OpExecError("Full abort, cleanup manually!!")
4010
4011       dev.children.append(new_drbd)
4012       cfg.AddInstance(instance)
4013
4014     # this can fail as the old devices are degraded and _WaitForSync
4015     # does a combined result over all disks, so we don't check its
4016     # return value
4017     _WaitForSync(cfg, instance, self.proc, unlock=True)
4018
4019     # so check manually all the devices
4020     for name in iv_names:
4021       dev, child, new_drbd = iv_names[name]
4022       cfg.SetDiskID(dev, instance.primary_node)
4023       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4024       if is_degr:
4025         raise errors.OpExecError("MD device %s is degraded!" % name)
4026       cfg.SetDiskID(new_drbd, instance.primary_node)
4027       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4028       if is_degr:
4029         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4030
4031     for name in iv_names:
4032       dev, child, new_drbd = iv_names[name]
4033       logger.Info("remove mirror %s component" % name)
4034       cfg.SetDiskID(dev, instance.primary_node)
4035       if not rpc.call_blockdev_removechildren(instance.primary_node,
4036                                               dev, [child]):
4037         logger.Error("Can't remove child from mirror, aborting"
4038                      " *this device cleanup*.\nYou need to cleanup manually!!")
4039         continue
4040
4041       for node in child.logical_id[:2]:
4042         logger.Info("remove child device on %s" % node)
4043         cfg.SetDiskID(child, node)
4044         if not rpc.call_blockdev_remove(node, child):
4045           logger.Error("Warning: failed to remove device from node %s,"
4046                        " continuing operation." % node)
4047
4048       dev.children.remove(child)
4049
4050       cfg.AddInstance(instance)
4051
4052   def _ExecD8DiskOnly(self, feedback_fn):
4053     """Replace a disk on the primary or secondary for dbrd8.
4054
4055     The algorithm for replace is quite complicated:
4056       - for each disk to be replaced:
4057         - create new LVs on the target node with unique names
4058         - detach old LVs from the drbd device
4059         - rename old LVs to name_replaced.<time_t>
4060         - rename new LVs to old LVs
4061         - attach the new LVs (with the old names now) to the drbd device
4062       - wait for sync across all devices
4063       - for each modified disk:
4064         - remove old LVs (which have the name name_replaces.<time_t>)
4065
4066     Failures are not very well handled.
4067
4068     """
4069     steps_total = 6
4070     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4071     instance = self.instance
4072     iv_names = {}
4073     vgname = self.cfg.GetVGName()
4074     # start of work
4075     cfg = self.cfg
4076     tgt_node = self.tgt_node
4077     oth_node = self.oth_node
4078
4079     # Step: check device activation
4080     self.proc.LogStep(1, steps_total, "check device existence")
4081     info("checking volume groups")
4082     my_vg = cfg.GetVGName()
4083     results = rpc.call_vg_list([oth_node, tgt_node])
4084     if not results:
4085       raise errors.OpExecError("Can't list volume groups on the nodes")
4086     for node in oth_node, tgt_node:
4087       res = results.get(node, False)
4088       if not res or my_vg not in res:
4089         raise errors.OpExecError("Volume group '%s' not found on %s" %
4090                                  (my_vg, node))
4091     for dev in instance.disks:
4092       if not dev.iv_name in self.op.disks:
4093         continue
4094       for node in tgt_node, oth_node:
4095         info("checking %s on %s" % (dev.iv_name, node))
4096         cfg.SetDiskID(dev, node)
4097         if not rpc.call_blockdev_find(node, dev):
4098           raise errors.OpExecError("Can't find device %s on node %s" %
4099                                    (dev.iv_name, node))
4100
4101     # Step: check other node consistency
4102     self.proc.LogStep(2, steps_total, "check peer consistency")
4103     for dev in instance.disks:
4104       if not dev.iv_name in self.op.disks:
4105         continue
4106       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4107       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4108                                    oth_node==instance.primary_node):
4109         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4110                                  " to replace disks on this node (%s)" %
4111                                  (oth_node, tgt_node))
4112
4113     # Step: create new storage
4114     self.proc.LogStep(3, steps_total, "allocate new storage")
4115     for dev in instance.disks:
4116       if not dev.iv_name in self.op.disks:
4117         continue
4118       size = dev.size
4119       cfg.SetDiskID(dev, tgt_node)
4120       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4121       names = _GenerateUniqueNames(cfg, lv_names)
4122       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4123                              logical_id=(vgname, names[0]))
4124       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4125                              logical_id=(vgname, names[1]))
4126       new_lvs = [lv_data, lv_meta]
4127       old_lvs = dev.children
4128       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4129       info("creating new local storage on %s for %s" %
4130            (tgt_node, dev.iv_name))
4131       # since we *always* want to create this LV, we use the
4132       # _Create...OnPrimary (which forces the creation), even if we
4133       # are talking about the secondary node
4134       for new_lv in new_lvs:
4135         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4136                                         _GetInstanceInfoText(instance)):
4137           raise errors.OpExecError("Failed to create new LV named '%s' on"
4138                                    " node '%s'" %
4139                                    (new_lv.logical_id[1], tgt_node))
4140
4141     # Step: for each lv, detach+rename*2+attach
4142     self.proc.LogStep(4, steps_total, "change drbd configuration")
4143     for dev, old_lvs, new_lvs in iv_names.itervalues():
4144       info("detaching %s drbd from local storage" % dev.iv_name)
4145       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4146         raise errors.OpExecError("Can't detach drbd from local storage on node"
4147                                  " %s for device %s" % (tgt_node, dev.iv_name))
4148       #dev.children = []
4149       #cfg.Update(instance)
4150
4151       # ok, we created the new LVs, so now we know we have the needed
4152       # storage; as such, we proceed on the target node to rename
4153       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4154       # using the assumption that logical_id == physical_id (which in
4155       # turn is the unique_id on that node)
4156
4157       # FIXME(iustin): use a better name for the replaced LVs
4158       temp_suffix = int(time.time())
4159       ren_fn = lambda d, suff: (d.physical_id[0],
4160                                 d.physical_id[1] + "_replaced-%s" % suff)
4161       # build the rename list based on what LVs exist on the node
4162       rlist = []
4163       for to_ren in old_lvs:
4164         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4165         if find_res is not None: # device exists
4166           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4167
4168       info("renaming the old LVs on the target node")
4169       if not rpc.call_blockdev_rename(tgt_node, rlist):
4170         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4171       # now we rename the new LVs to the old LVs
4172       info("renaming the new LVs on the target node")
4173       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4174       if not rpc.call_blockdev_rename(tgt_node, rlist):
4175         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4176
4177       for old, new in zip(old_lvs, new_lvs):
4178         new.logical_id = old.logical_id
4179         cfg.SetDiskID(new, tgt_node)
4180
4181       for disk in old_lvs:
4182         disk.logical_id = ren_fn(disk, temp_suffix)
4183         cfg.SetDiskID(disk, tgt_node)
4184
4185       # now that the new lvs have the old name, we can add them to the device
4186       info("adding new mirror component on %s" % tgt_node)
4187       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4188         for new_lv in new_lvs:
4189           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4190             warning("Can't rollback device %s", hint="manually cleanup unused"
4191                     " logical volumes")
4192         raise errors.OpExecError("Can't add local storage to drbd")
4193
4194       dev.children = new_lvs
4195       cfg.Update(instance)
4196
4197     # Step: wait for sync
4198
4199     # this can fail as the old devices are degraded and _WaitForSync
4200     # does a combined result over all disks, so we don't check its
4201     # return value
4202     self.proc.LogStep(5, steps_total, "sync devices")
4203     _WaitForSync(cfg, instance, self.proc, unlock=True)
4204
4205     # so check manually all the devices
4206     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4207       cfg.SetDiskID(dev, instance.primary_node)
4208       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4209       if is_degr:
4210         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4211
4212     # Step: remove old storage
4213     self.proc.LogStep(6, steps_total, "removing old storage")
4214     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4215       info("remove logical volumes for %s" % name)
4216       for lv in old_lvs:
4217         cfg.SetDiskID(lv, tgt_node)
4218         if not rpc.call_blockdev_remove(tgt_node, lv):
4219           warning("Can't remove old LV", hint="manually remove unused LVs")
4220           continue
4221
4222   def _ExecD8Secondary(self, feedback_fn):
4223     """Replace the secondary node for drbd8.
4224
4225     The algorithm for replace is quite complicated:
4226       - for all disks of the instance:
4227         - create new LVs on the new node with same names
4228         - shutdown the drbd device on the old secondary
4229         - disconnect the drbd network on the primary
4230         - create the drbd device on the new secondary
4231         - network attach the drbd on the primary, using an artifice:
4232           the drbd code for Attach() will connect to the network if it
4233           finds a device which is connected to the good local disks but
4234           not network enabled
4235       - wait for sync across all devices
4236       - remove all disks from the old secondary
4237
4238     Failures are not very well handled.
4239
4240     """
4241     steps_total = 6
4242     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4243     instance = self.instance
4244     iv_names = {}
4245     vgname = self.cfg.GetVGName()
4246     # start of work
4247     cfg = self.cfg
4248     old_node = self.tgt_node
4249     new_node = self.new_node
4250     pri_node = instance.primary_node
4251
4252     # Step: check device activation
4253     self.proc.LogStep(1, steps_total, "check device existence")
4254     info("checking volume groups")
4255     my_vg = cfg.GetVGName()
4256     results = rpc.call_vg_list([pri_node, new_node])
4257     if not results:
4258       raise errors.OpExecError("Can't list volume groups on the nodes")
4259     for node in pri_node, new_node:
4260       res = results.get(node, False)
4261       if not res or my_vg not in res:
4262         raise errors.OpExecError("Volume group '%s' not found on %s" %
4263                                  (my_vg, node))
4264     for dev in instance.disks:
4265       if not dev.iv_name in self.op.disks:
4266         continue
4267       info("checking %s on %s" % (dev.iv_name, pri_node))
4268       cfg.SetDiskID(dev, pri_node)
4269       if not rpc.call_blockdev_find(pri_node, dev):
4270         raise errors.OpExecError("Can't find device %s on node %s" %
4271                                  (dev.iv_name, pri_node))
4272
4273     # Step: check other node consistency
4274     self.proc.LogStep(2, steps_total, "check peer consistency")
4275     for dev in instance.disks:
4276       if not dev.iv_name in self.op.disks:
4277         continue
4278       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4279       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4280         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4281                                  " unsafe to replace the secondary" %
4282                                  pri_node)
4283
4284     # Step: create new storage
4285     self.proc.LogStep(3, steps_total, "allocate new storage")
4286     for dev in instance.disks:
4287       size = dev.size
4288       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4289       # since we *always* want to create this LV, we use the
4290       # _Create...OnPrimary (which forces the creation), even if we
4291       # are talking about the secondary node
4292       for new_lv in dev.children:
4293         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4294                                         _GetInstanceInfoText(instance)):
4295           raise errors.OpExecError("Failed to create new LV named '%s' on"
4296                                    " node '%s'" %
4297                                    (new_lv.logical_id[1], new_node))
4298
4299       iv_names[dev.iv_name] = (dev, dev.children)
4300
4301     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4302     for dev in instance.disks:
4303       size = dev.size
4304       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4305       # create new devices on new_node
4306       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4307                               logical_id=(pri_node, new_node,
4308                                           dev.logical_id[2]),
4309                               children=dev.children)
4310       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4311                                         new_drbd, False,
4312                                       _GetInstanceInfoText(instance)):
4313         raise errors.OpExecError("Failed to create new DRBD on"
4314                                  " node '%s'" % new_node)
4315
4316     for dev in instance.disks:
4317       # we have new devices, shutdown the drbd on the old secondary
4318       info("shutting down drbd for %s on old node" % dev.iv_name)
4319       cfg.SetDiskID(dev, old_node)
4320       if not rpc.call_blockdev_shutdown(old_node, dev):
4321         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4322                 hint="Please cleanup this device manually as soon as possible")
4323
4324     info("detaching primary drbds from the network (=> standalone)")
4325     done = 0
4326     for dev in instance.disks:
4327       cfg.SetDiskID(dev, pri_node)
4328       # set the physical (unique in bdev terms) id to None, meaning
4329       # detach from network
4330       dev.physical_id = (None,) * len(dev.physical_id)
4331       # and 'find' the device, which will 'fix' it to match the
4332       # standalone state
4333       if rpc.call_blockdev_find(pri_node, dev):
4334         done += 1
4335       else:
4336         warning("Failed to detach drbd %s from network, unusual case" %
4337                 dev.iv_name)
4338
4339     if not done:
4340       # no detaches succeeded (very unlikely)
4341       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4342
4343     # if we managed to detach at least one, we update all the disks of
4344     # the instance to point to the new secondary
4345     info("updating instance configuration")
4346     for dev in instance.disks:
4347       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4348       cfg.SetDiskID(dev, pri_node)
4349     cfg.Update(instance)
4350
4351     # and now perform the drbd attach
4352     info("attaching primary drbds to new secondary (standalone => connected)")
4353     failures = []
4354     for dev in instance.disks:
4355       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4356       # since the attach is smart, it's enough to 'find' the device,
4357       # it will automatically activate the network, if the physical_id
4358       # is correct
4359       cfg.SetDiskID(dev, pri_node)
4360       if not rpc.call_blockdev_find(pri_node, dev):
4361         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4362                 "please do a gnt-instance info to see the status of disks")
4363
4364     # this can fail as the old devices are degraded and _WaitForSync
4365     # does a combined result over all disks, so we don't check its
4366     # return value
4367     self.proc.LogStep(5, steps_total, "sync devices")
4368     _WaitForSync(cfg, instance, self.proc, unlock=True)
4369
4370     # so check manually all the devices
4371     for name, (dev, old_lvs) in iv_names.iteritems():
4372       cfg.SetDiskID(dev, pri_node)
4373       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4374       if is_degr:
4375         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4376
4377     self.proc.LogStep(6, steps_total, "removing old storage")
4378     for name, (dev, old_lvs) in iv_names.iteritems():
4379       info("remove logical volumes for %s" % name)
4380       for lv in old_lvs:
4381         cfg.SetDiskID(lv, old_node)
4382         if not rpc.call_blockdev_remove(old_node, lv):
4383           warning("Can't remove LV on old secondary",
4384                   hint="Cleanup stale volumes by hand")
4385
4386   def Exec(self, feedback_fn):
4387     """Execute disk replacement.
4388
4389     This dispatches the disk replacement to the appropriate handler.
4390
4391     """
4392     instance = self.instance
4393
4394     # Activate the instance disks if we're replacing them on a down instance
4395     if instance.status == "down":
4396       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4397       self.proc.ChainOpCode(op)
4398
4399     if instance.disk_template == constants.DT_REMOTE_RAID1:
4400       fn = self._ExecRR1
4401     elif instance.disk_template == constants.DT_DRBD8:
4402       if self.op.remote_node is None:
4403         fn = self._ExecD8DiskOnly
4404       else:
4405         fn = self._ExecD8Secondary
4406     else:
4407       raise errors.ProgrammerError("Unhandled disk replacement case")
4408
4409     ret = fn(feedback_fn)
4410
4411     # Deactivate the instance disks if we're replacing them on a down instance
4412     if instance.status == "down":
4413       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4414       self.proc.ChainOpCode(op)
4415
4416     return ret
4417
4418
4419 class LUGrowDisk(LogicalUnit):
4420   """Grow a disk of an instance.
4421
4422   """
4423   HPATH = "disk-grow"
4424   HTYPE = constants.HTYPE_INSTANCE
4425   _OP_REQP = ["instance_name", "disk", "amount"]
4426
4427   def BuildHooksEnv(self):
4428     """Build hooks env.
4429
4430     This runs on the master, the primary and all the secondaries.
4431
4432     """
4433     env = {
4434       "DISK": self.op.disk,
4435       "AMOUNT": self.op.amount,
4436       }
4437     env.update(_BuildInstanceHookEnvByObject(self.instance))
4438     nl = [
4439       self.sstore.GetMasterNode(),
4440       self.instance.primary_node,
4441       ]
4442     return env, nl, nl
4443
4444   def CheckPrereq(self):
4445     """Check prerequisites.
4446
4447     This checks that the instance is in the cluster.
4448
4449     """
4450     instance = self.cfg.GetInstanceInfo(
4451       self.cfg.ExpandInstanceName(self.op.instance_name))
4452     if instance is None:
4453       raise errors.OpPrereqError("Instance '%s' not known" %
4454                                  self.op.instance_name)
4455     self.instance = instance
4456     self.op.instance_name = instance.name
4457
4458     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4459       raise errors.OpPrereqError("Instance's disk layout does not support"
4460                                  " growing.")
4461
4462     if instance.FindDisk(self.op.disk) is None:
4463       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4464                                  (name, instance.name))
4465
4466     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4467     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4468     for node in nodenames:
4469       info = nodeinfo.get(node, None)
4470       if not info:
4471         raise errors.OpPrereqError("Cannot get current information"
4472                                    " from node '%s'" % node)
4473       vg_free = info.get('vg_free', None)
4474       if not isinstance(vg_free, int):
4475         raise errors.OpPrereqError("Can't compute free disk space on"
4476                                    " node %s" % node)
4477       if self.op.amount > info['vg_free']:
4478         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4479                                    " %d MiB available, %d MiB required" %
4480                                    (node, info['vg_free'], self.op.amount))
4481
4482   def Exec(self, feedback_fn):
4483     """Execute disk grow.
4484
4485     """
4486     instance = self.instance
4487     disk = instance.FindDisk(self.op.disk)
4488     for node in (instance.secondary_nodes + (instance.primary_node,)):
4489       self.cfg.SetDiskID(disk, node)
4490       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4491       if not result or not isinstance(result, tuple) or len(result) != 2:
4492         raise errors.OpExecError("grow request failed to node %s" % node)
4493       elif not result[0]:
4494         raise errors.OpExecError("grow request failed to node %s: %s" %
4495                                  (node, result[1]))
4496     disk.RecordGrow(self.op.amount)
4497     self.cfg.Update(instance)
4498     return
4499
4500
4501 class LUQueryInstanceData(NoHooksLU):
4502   """Query runtime instance data.
4503
4504   """
4505   _OP_REQP = ["instances"]
4506
4507   def CheckPrereq(self):
4508     """Check prerequisites.
4509
4510     This only checks the optional instance list against the existing names.
4511
4512     """
4513     if not isinstance(self.op.instances, list):
4514       raise errors.OpPrereqError("Invalid argument type 'instances'")
4515     if self.op.instances:
4516       self.wanted_instances = []
4517       names = self.op.instances
4518       for name in names:
4519         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4520         if instance is None:
4521           raise errors.OpPrereqError("No such instance name '%s'" % name)
4522         self.wanted_instances.append(instance)
4523     else:
4524       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4525                                in self.cfg.GetInstanceList()]
4526     return
4527
4528
4529   def _ComputeDiskStatus(self, instance, snode, dev):
4530     """Compute block device status.
4531
4532     """
4533     self.cfg.SetDiskID(dev, instance.primary_node)
4534     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4535     if dev.dev_type in constants.LDS_DRBD:
4536       # we change the snode then (otherwise we use the one passed in)
4537       if dev.logical_id[0] == instance.primary_node:
4538         snode = dev.logical_id[1]
4539       else:
4540         snode = dev.logical_id[0]
4541
4542     if snode:
4543       self.cfg.SetDiskID(dev, snode)
4544       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4545     else:
4546       dev_sstatus = None
4547
4548     if dev.children:
4549       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4550                       for child in dev.children]
4551     else:
4552       dev_children = []
4553
4554     data = {
4555       "iv_name": dev.iv_name,
4556       "dev_type": dev.dev_type,
4557       "logical_id": dev.logical_id,
4558       "physical_id": dev.physical_id,
4559       "pstatus": dev_pstatus,
4560       "sstatus": dev_sstatus,
4561       "children": dev_children,
4562       }
4563
4564     return data
4565
4566   def Exec(self, feedback_fn):
4567     """Gather and return data"""
4568     result = {}
4569     for instance in self.wanted_instances:
4570       remote_info = rpc.call_instance_info(instance.primary_node,
4571                                                 instance.name)
4572       if remote_info and "state" in remote_info:
4573         remote_state = "up"
4574       else:
4575         remote_state = "down"
4576       if instance.status == "down":
4577         config_state = "down"
4578       else:
4579         config_state = "up"
4580
4581       disks = [self._ComputeDiskStatus(instance, None, device)
4582                for device in instance.disks]
4583
4584       idict = {
4585         "name": instance.name,
4586         "config_state": config_state,
4587         "run_state": remote_state,
4588         "pnode": instance.primary_node,
4589         "snodes": instance.secondary_nodes,
4590         "os": instance.os,
4591         "memory": instance.memory,
4592         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4593         "disks": disks,
4594         "vcpus": instance.vcpus,
4595         }
4596
4597       htkind = self.sstore.GetHypervisorType()
4598       if htkind == constants.HT_XEN_PVM30:
4599         idict["kernel_path"] = instance.kernel_path
4600         idict["initrd_path"] = instance.initrd_path
4601
4602       if htkind == constants.HT_XEN_HVM31:
4603         idict["hvm_boot_order"] = instance.hvm_boot_order
4604         idict["hvm_acpi"] = instance.hvm_acpi
4605         idict["hvm_pae"] = instance.hvm_pae
4606         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4607
4608       if htkind in constants.HTS_REQ_PORT:
4609         idict["vnc_bind_address"] = instance.vnc_bind_address
4610         idict["network_port"] = instance.network_port
4611
4612       result[instance.name] = idict
4613
4614     return result
4615
4616
4617 class LUSetInstanceParms(LogicalUnit):
4618   """Modifies an instances's parameters.
4619
4620   """
4621   HPATH = "instance-modify"
4622   HTYPE = constants.HTYPE_INSTANCE
4623   _OP_REQP = ["instance_name"]
4624
4625   def BuildHooksEnv(self):
4626     """Build hooks env.
4627
4628     This runs on the master, primary and secondaries.
4629
4630     """
4631     args = dict()
4632     if self.mem:
4633       args['memory'] = self.mem
4634     if self.vcpus:
4635       args['vcpus'] = self.vcpus
4636     if self.do_ip or self.do_bridge or self.mac:
4637       if self.do_ip:
4638         ip = self.ip
4639       else:
4640         ip = self.instance.nics[0].ip
4641       if self.bridge:
4642         bridge = self.bridge
4643       else:
4644         bridge = self.instance.nics[0].bridge
4645       if self.mac:
4646         mac = self.mac
4647       else:
4648         mac = self.instance.nics[0].mac
4649       args['nics'] = [(ip, bridge, mac)]
4650     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4651     nl = [self.sstore.GetMasterNode(),
4652           self.instance.primary_node] + list(self.instance.secondary_nodes)
4653     return env, nl, nl
4654
4655   def CheckPrereq(self):
4656     """Check prerequisites.
4657
4658     This only checks the instance list against the existing names.
4659
4660     """
4661     self.mem = getattr(self.op, "mem", None)
4662     self.vcpus = getattr(self.op, "vcpus", None)
4663     self.ip = getattr(self.op, "ip", None)
4664     self.mac = getattr(self.op, "mac", None)
4665     self.bridge = getattr(self.op, "bridge", None)
4666     self.kernel_path = getattr(self.op, "kernel_path", None)
4667     self.initrd_path = getattr(self.op, "initrd_path", None)
4668     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4669     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4670     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4671     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4672     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4673     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4674                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4675                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4676                  self.vnc_bind_address]
4677     if all_parms.count(None) == len(all_parms):
4678       raise errors.OpPrereqError("No changes submitted")
4679     if self.mem is not None:
4680       try:
4681         self.mem = int(self.mem)
4682       except ValueError, err:
4683         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4684     if self.vcpus is not None:
4685       try:
4686         self.vcpus = int(self.vcpus)
4687       except ValueError, err:
4688         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4689     if self.ip is not None:
4690       self.do_ip = True
4691       if self.ip.lower() == "none":
4692         self.ip = None
4693       else:
4694         if not utils.IsValidIP(self.ip):
4695           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4696     else:
4697       self.do_ip = False
4698     self.do_bridge = (self.bridge is not None)
4699     if self.mac is not None:
4700       if self.cfg.IsMacInUse(self.mac):
4701         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4702                                    self.mac)
4703       if not utils.IsValidMac(self.mac):
4704         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4705
4706     if self.kernel_path is not None:
4707       self.do_kernel_path = True
4708       if self.kernel_path == constants.VALUE_NONE:
4709         raise errors.OpPrereqError("Can't set instance to no kernel")
4710
4711       if self.kernel_path != constants.VALUE_DEFAULT:
4712         if not os.path.isabs(self.kernel_path):
4713           raise errors.OpPrereqError("The kernel path must be an absolute"
4714                                     " filename")
4715     else:
4716       self.do_kernel_path = False
4717
4718     if self.initrd_path is not None:
4719       self.do_initrd_path = True
4720       if self.initrd_path not in (constants.VALUE_NONE,
4721                                   constants.VALUE_DEFAULT):
4722         if not os.path.isabs(self.initrd_path):
4723           raise errors.OpPrereqError("The initrd path must be an absolute"
4724                                     " filename")
4725     else:
4726       self.do_initrd_path = False
4727
4728     # boot order verification
4729     if self.hvm_boot_order is not None:
4730       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4731         if len(self.hvm_boot_order.strip("acdn")) != 0:
4732           raise errors.OpPrereqError("invalid boot order specified,"
4733                                      " must be one or more of [acdn]"
4734                                      " or 'default'")
4735
4736     # hvm_cdrom_image_path verification
4737     if self.op.hvm_cdrom_image_path is not None:
4738       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4739               self.op.hvm_cdrom_image_path.lower() == "none"):
4740         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4741                                    " be an absolute path or None, not %s" %
4742                                    self.op.hvm_cdrom_image_path)
4743       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4744               self.op.hvm_cdrom_image_path.lower() == "none"):
4745         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4746                                    " regular file or a symlink pointing to"
4747                                    " an existing regular file, not %s" %
4748                                    self.op.hvm_cdrom_image_path)
4749
4750     # vnc_bind_address verification
4751     if self.op.vnc_bind_address is not None:
4752       if not utils.IsValidIP(self.op.vnc_bind_address):
4753         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4754                                    " like a valid IP address" %
4755                                    self.op.vnc_bind_address)
4756
4757     instance = self.cfg.GetInstanceInfo(
4758       self.cfg.ExpandInstanceName(self.op.instance_name))
4759     if instance is None:
4760       raise errors.OpPrereqError("No such instance name '%s'" %
4761                                  self.op.instance_name)
4762     self.op.instance_name = instance.name
4763     self.instance = instance
4764     return
4765
4766   def Exec(self, feedback_fn):
4767     """Modifies an instance.
4768
4769     All parameters take effect only at the next restart of the instance.
4770     """
4771     result = []
4772     instance = self.instance
4773     if self.mem:
4774       instance.memory = self.mem
4775       result.append(("mem", self.mem))
4776     if self.vcpus:
4777       instance.vcpus = self.vcpus
4778       result.append(("vcpus",  self.vcpus))
4779     if self.do_ip:
4780       instance.nics[0].ip = self.ip
4781       result.append(("ip", self.ip))
4782     if self.bridge:
4783       instance.nics[0].bridge = self.bridge
4784       result.append(("bridge", self.bridge))
4785     if self.mac:
4786       instance.nics[0].mac = self.mac
4787       result.append(("mac", self.mac))
4788     if self.do_kernel_path:
4789       instance.kernel_path = self.kernel_path
4790       result.append(("kernel_path", self.kernel_path))
4791     if self.do_initrd_path:
4792       instance.initrd_path = self.initrd_path
4793       result.append(("initrd_path", self.initrd_path))
4794     if self.hvm_boot_order:
4795       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4796         instance.hvm_boot_order = None
4797       else:
4798         instance.hvm_boot_order = self.hvm_boot_order
4799       result.append(("hvm_boot_order", self.hvm_boot_order))
4800     if self.hvm_acpi is not None:
4801       instance.hvm_acpi = self.hvm_acpi
4802       result.append(("hvm_acpi", self.hvm_acpi))
4803     if self.hvm_pae is not None:
4804       instance.hvm_pae = self.hvm_pae
4805       result.append(("hvm_pae", self.hvm_pae))
4806     if self.hvm_cdrom_image_path:
4807       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4808         instance.hvm_cdrom_image_path = None
4809       else:
4810         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4811       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4812     if self.vnc_bind_address:
4813       instance.vnc_bind_address = self.vnc_bind_address
4814       result.append(("vnc_bind_address", self.vnc_bind_address))
4815
4816     self.cfg.AddInstance(instance)
4817
4818     return result
4819
4820
4821 class LUQueryExports(NoHooksLU):
4822   """Query the exports list
4823
4824   """
4825   _OP_REQP = []
4826
4827   def CheckPrereq(self):
4828     """Check that the nodelist contains only existing nodes.
4829
4830     """
4831     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4832
4833   def Exec(self, feedback_fn):
4834     """Compute the list of all the exported system images.
4835
4836     Returns:
4837       a dictionary with the structure node->(export-list)
4838       where export-list is a list of the instances exported on
4839       that node.
4840
4841     """
4842     return rpc.call_export_list(self.nodes)
4843
4844
4845 class LUExportInstance(LogicalUnit):
4846   """Export an instance to an image in the cluster.
4847
4848   """
4849   HPATH = "instance-export"
4850   HTYPE = constants.HTYPE_INSTANCE
4851   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4852
4853   def BuildHooksEnv(self):
4854     """Build hooks env.
4855
4856     This will run on the master, primary node and target node.
4857
4858     """
4859     env = {
4860       "EXPORT_NODE": self.op.target_node,
4861       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4862       }
4863     env.update(_BuildInstanceHookEnvByObject(self.instance))
4864     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4865           self.op.target_node]
4866     return env, nl, nl
4867
4868   def CheckPrereq(self):
4869     """Check prerequisites.
4870
4871     This checks that the instance and node names are valid.
4872
4873     """
4874     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4875     self.instance = self.cfg.GetInstanceInfo(instance_name)
4876     if self.instance is None:
4877       raise errors.OpPrereqError("Instance '%s' not found" %
4878                                  self.op.instance_name)
4879
4880     # node verification
4881     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4882     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4883
4884     if self.dst_node is None:
4885       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4886                                  self.op.target_node)
4887     self.op.target_node = self.dst_node.name
4888
4889   def Exec(self, feedback_fn):
4890     """Export an instance to an image in the cluster.
4891
4892     """
4893     instance = self.instance
4894     dst_node = self.dst_node
4895     src_node = instance.primary_node
4896     if self.op.shutdown:
4897       # shutdown the instance, but not the disks
4898       if not rpc.call_instance_shutdown(src_node, instance):
4899         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4900                                  (instance.name, src_node))
4901
4902     vgname = self.cfg.GetVGName()
4903
4904     snap_disks = []
4905
4906     try:
4907       for disk in instance.disks:
4908         if disk.iv_name == "sda":
4909           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4910           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4911
4912           if not new_dev_name:
4913             logger.Error("could not snapshot block device %s on node %s" %
4914                          (disk.logical_id[1], src_node))
4915           else:
4916             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4917                                       logical_id=(vgname, new_dev_name),
4918                                       physical_id=(vgname, new_dev_name),
4919                                       iv_name=disk.iv_name)
4920             snap_disks.append(new_dev)
4921
4922     finally:
4923       if self.op.shutdown and instance.status == "up":
4924         if not rpc.call_instance_start(src_node, instance, None):
4925           _ShutdownInstanceDisks(instance, self.cfg)
4926           raise errors.OpExecError("Could not start instance")
4927
4928     # TODO: check for size
4929
4930     for dev in snap_disks:
4931       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4932                                            instance):
4933         logger.Error("could not export block device %s from node"
4934                      " %s to node %s" %
4935                      (dev.logical_id[1], src_node, dst_node.name))
4936       if not rpc.call_blockdev_remove(src_node, dev):
4937         logger.Error("could not remove snapshot block device %s from"
4938                      " node %s" % (dev.logical_id[1], src_node))
4939
4940     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4941       logger.Error("could not finalize export for instance %s on node %s" %
4942                    (instance.name, dst_node.name))
4943
4944     nodelist = self.cfg.GetNodeList()
4945     nodelist.remove(dst_node.name)
4946
4947     # on one-node clusters nodelist will be empty after the removal
4948     # if we proceed the backup would be removed because OpQueryExports
4949     # substitutes an empty list with the full cluster node list.
4950     if nodelist:
4951       op = opcodes.OpQueryExports(nodes=nodelist)
4952       exportlist = self.proc.ChainOpCode(op)
4953       for node in exportlist:
4954         if instance.name in exportlist[node]:
4955           if not rpc.call_export_remove(node, instance.name):
4956             logger.Error("could not remove older export for instance %s"
4957                          " on node %s" % (instance.name, node))
4958
4959
4960 class LURemoveExport(NoHooksLU):
4961   """Remove exports related to the named instance.
4962
4963   """
4964   _OP_REQP = ["instance_name"]
4965
4966   def CheckPrereq(self):
4967     """Check prerequisites.
4968     """
4969     pass
4970
4971   def Exec(self, feedback_fn):
4972     """Remove any export.
4973
4974     """
4975     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4976     # If the instance was not found we'll try with the name that was passed in.
4977     # This will only work if it was an FQDN, though.
4978     fqdn_warn = False
4979     if not instance_name:
4980       fqdn_warn = True
4981       instance_name = self.op.instance_name
4982
4983     op = opcodes.OpQueryExports(nodes=[])
4984     exportlist = self.proc.ChainOpCode(op)
4985     found = False
4986     for node in exportlist:
4987       if instance_name in exportlist[node]:
4988         found = True
4989         if not rpc.call_export_remove(node, instance_name):
4990           logger.Error("could not remove export for instance %s"
4991                        " on node %s" % (instance_name, node))
4992
4993     if fqdn_warn and not found:
4994       feedback_fn("Export not found. If trying to remove an export belonging"
4995                   " to a deleted instance please use its Fully Qualified"
4996                   " Domain Name.")
4997
4998
4999 class TagsLU(NoHooksLU):
5000   """Generic tags LU.
5001
5002   This is an abstract class which is the parent of all the other tags LUs.
5003
5004   """
5005   def CheckPrereq(self):
5006     """Check prerequisites.
5007
5008     """
5009     if self.op.kind == constants.TAG_CLUSTER:
5010       self.target = self.cfg.GetClusterInfo()
5011     elif self.op.kind == constants.TAG_NODE:
5012       name = self.cfg.ExpandNodeName(self.op.name)
5013       if name is None:
5014         raise errors.OpPrereqError("Invalid node name (%s)" %
5015                                    (self.op.name,))
5016       self.op.name = name
5017       self.target = self.cfg.GetNodeInfo(name)
5018     elif self.op.kind == constants.TAG_INSTANCE:
5019       name = self.cfg.ExpandInstanceName(self.op.name)
5020       if name is None:
5021         raise errors.OpPrereqError("Invalid instance name (%s)" %
5022                                    (self.op.name,))
5023       self.op.name = name
5024       self.target = self.cfg.GetInstanceInfo(name)
5025     else:
5026       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5027                                  str(self.op.kind))
5028
5029
5030 class LUGetTags(TagsLU):
5031   """Returns the tags of a given object.
5032
5033   """
5034   _OP_REQP = ["kind", "name"]
5035
5036   def Exec(self, feedback_fn):
5037     """Returns the tag list.
5038
5039     """
5040     return self.target.GetTags()
5041
5042
5043 class LUSearchTags(NoHooksLU):
5044   """Searches the tags for a given pattern.
5045
5046   """
5047   _OP_REQP = ["pattern"]
5048
5049   def CheckPrereq(self):
5050     """Check prerequisites.
5051
5052     This checks the pattern passed for validity by compiling it.
5053
5054     """
5055     try:
5056       self.re = re.compile(self.op.pattern)
5057     except re.error, err:
5058       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5059                                  (self.op.pattern, err))
5060
5061   def Exec(self, feedback_fn):
5062     """Returns the tag list.
5063
5064     """
5065     cfg = self.cfg
5066     tgts = [("/cluster", cfg.GetClusterInfo())]
5067     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5068     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5069     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5070     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5071     results = []
5072     for path, target in tgts:
5073       for tag in target.GetTags():
5074         if self.re.search(tag):
5075           results.append((path, tag))
5076     return results
5077
5078
5079 class LUAddTags(TagsLU):
5080   """Sets a tag on a given object.
5081
5082   """
5083   _OP_REQP = ["kind", "name", "tags"]
5084
5085   def CheckPrereq(self):
5086     """Check prerequisites.
5087
5088     This checks the type and length of the tag name and value.
5089
5090     """
5091     TagsLU.CheckPrereq(self)
5092     for tag in self.op.tags:
5093       objects.TaggableObject.ValidateTag(tag)
5094
5095   def Exec(self, feedback_fn):
5096     """Sets the tag.
5097
5098     """
5099     try:
5100       for tag in self.op.tags:
5101         self.target.AddTag(tag)
5102     except errors.TagError, err:
5103       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5104     try:
5105       self.cfg.Update(self.target)
5106     except errors.ConfigurationError:
5107       raise errors.OpRetryError("There has been a modification to the"
5108                                 " config file and the operation has been"
5109                                 " aborted. Please retry.")
5110
5111
5112 class LUDelTags(TagsLU):
5113   """Delete a list of tags from a given object.
5114
5115   """
5116   _OP_REQP = ["kind", "name", "tags"]
5117
5118   def CheckPrereq(self):
5119     """Check prerequisites.
5120
5121     This checks that we have the given tag.
5122
5123     """
5124     TagsLU.CheckPrereq(self)
5125     for tag in self.op.tags:
5126       objects.TaggableObject.ValidateTag(tag)
5127     del_tags = frozenset(self.op.tags)
5128     cur_tags = self.target.GetTags()
5129     if not del_tags <= cur_tags:
5130       diff_tags = del_tags - cur_tags
5131       diff_names = ["'%s'" % tag for tag in diff_tags]
5132       diff_names.sort()
5133       raise errors.OpPrereqError("Tag(s) %s not found" %
5134                                  (",".join(diff_names)))
5135
5136   def Exec(self, feedback_fn):
5137     """Remove the tag from the object.
5138
5139     """
5140     for tag in self.op.tags:
5141       self.target.RemoveTag(tag)
5142     try:
5143       self.cfg.Update(self.target)
5144     except errors.ConfigurationError:
5145       raise errors.OpRetryError("There has been a modification to the"
5146                                 " config file and the operation has been"
5147                                 " aborted. Please retry.")
5148
5149 class LUTestDelay(NoHooksLU):
5150   """Sleep for a specified amount of time.
5151
5152   This LU sleeps on the master and/or nodes for a specified amoutn of
5153   time.
5154
5155   """
5156   _OP_REQP = ["duration", "on_master", "on_nodes"]
5157
5158   def CheckPrereq(self):
5159     """Check prerequisites.
5160
5161     This checks that we have a good list of nodes and/or the duration
5162     is valid.
5163
5164     """
5165
5166     if self.op.on_nodes:
5167       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5168
5169   def Exec(self, feedback_fn):
5170     """Do the actual sleep.
5171
5172     """
5173     if self.op.on_master:
5174       if not utils.TestDelay(self.op.duration):
5175         raise errors.OpExecError("Error during master delay test")
5176     if self.op.on_nodes:
5177       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5178       if not result:
5179         raise errors.OpExecError("Complete failure from rpc call")
5180       for node, node_result in result.items():
5181         if not node_result:
5182           raise errors.OpExecError("Failure during rpc call to node %s,"
5183                                    " result: %s" % (node, node_result))
5184
5185
5186 class IAllocator(object):
5187   """IAllocator framework.
5188
5189   An IAllocator instance has three sets of attributes:
5190     - cfg/sstore that are needed to query the cluster
5191     - input data (all members of the _KEYS class attribute are required)
5192     - four buffer attributes (in|out_data|text), that represent the
5193       input (to the external script) in text and data structure format,
5194       and the output from it, again in two formats
5195     - the result variables from the script (success, info, nodes) for
5196       easy usage
5197
5198   """
5199   _ALLO_KEYS = [
5200     "mem_size", "disks", "disk_template",
5201     "os", "tags", "nics", "vcpus",
5202     ]
5203   _RELO_KEYS = [
5204     "relocate_from",
5205     ]
5206
5207   def __init__(self, cfg, sstore, mode, name, **kwargs):
5208     self.cfg = cfg
5209     self.sstore = sstore
5210     # init buffer variables
5211     self.in_text = self.out_text = self.in_data = self.out_data = None
5212     # init all input fields so that pylint is happy
5213     self.mode = mode
5214     self.name = name
5215     self.mem_size = self.disks = self.disk_template = None
5216     self.os = self.tags = self.nics = self.vcpus = None
5217     self.relocate_from = None
5218     # computed fields
5219     self.required_nodes = None
5220     # init result fields
5221     self.success = self.info = self.nodes = None
5222     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5223       keyset = self._ALLO_KEYS
5224     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5225       keyset = self._RELO_KEYS
5226     else:
5227       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5228                                    " IAllocator" % self.mode)
5229     for key in kwargs:
5230       if key not in keyset:
5231         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5232                                      " IAllocator" % key)
5233       setattr(self, key, kwargs[key])
5234     for key in keyset:
5235       if key not in kwargs:
5236         raise errors.ProgrammerError("Missing input parameter '%s' to"
5237                                      " IAllocator" % key)
5238     self._BuildInputData()
5239
5240   def _ComputeClusterData(self):
5241     """Compute the generic allocator input data.
5242
5243     This is the data that is independent of the actual operation.
5244
5245     """
5246     cfg = self.cfg
5247     # cluster data
5248     data = {
5249       "version": 1,
5250       "cluster_name": self.sstore.GetClusterName(),
5251       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5252       "hypervisor_type": self.sstore.GetHypervisorType(),
5253       # we don't have job IDs
5254       }
5255
5256     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5257
5258     # node data
5259     node_results = {}
5260     node_list = cfg.GetNodeList()
5261     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5262     for nname in node_list:
5263       ninfo = cfg.GetNodeInfo(nname)
5264       if nname not in node_data or not isinstance(node_data[nname], dict):
5265         raise errors.OpExecError("Can't get data for node %s" % nname)
5266       remote_info = node_data[nname]
5267       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5268                    'vg_size', 'vg_free', 'cpu_total']:
5269         if attr not in remote_info:
5270           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5271                                    (nname, attr))
5272         try:
5273           remote_info[attr] = int(remote_info[attr])
5274         except ValueError, err:
5275           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5276                                    " %s" % (nname, attr, str(err)))
5277       # compute memory used by primary instances
5278       i_p_mem = i_p_up_mem = 0
5279       for iinfo in i_list:
5280         if iinfo.primary_node == nname:
5281           i_p_mem += iinfo.memory
5282           if iinfo.status == "up":
5283             i_p_up_mem += iinfo.memory
5284
5285       # compute memory used by instances
5286       pnr = {
5287         "tags": list(ninfo.GetTags()),
5288         "total_memory": remote_info['memory_total'],
5289         "reserved_memory": remote_info['memory_dom0'],
5290         "free_memory": remote_info['memory_free'],
5291         "i_pri_memory": i_p_mem,
5292         "i_pri_up_memory": i_p_up_mem,
5293         "total_disk": remote_info['vg_size'],
5294         "free_disk": remote_info['vg_free'],
5295         "primary_ip": ninfo.primary_ip,
5296         "secondary_ip": ninfo.secondary_ip,
5297         "total_cpus": remote_info['cpu_total'],
5298         }
5299       node_results[nname] = pnr
5300     data["nodes"] = node_results
5301
5302     # instance data
5303     instance_data = {}
5304     for iinfo in i_list:
5305       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5306                   for n in iinfo.nics]
5307       pir = {
5308         "tags": list(iinfo.GetTags()),
5309         "should_run": iinfo.status == "up",
5310         "vcpus": iinfo.vcpus,
5311         "memory": iinfo.memory,
5312         "os": iinfo.os,
5313         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5314         "nics": nic_data,
5315         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5316         "disk_template": iinfo.disk_template,
5317         }
5318       instance_data[iinfo.name] = pir
5319
5320     data["instances"] = instance_data
5321
5322     self.in_data = data
5323
5324   def _AddNewInstance(self):
5325     """Add new instance data to allocator structure.
5326
5327     This in combination with _AllocatorGetClusterData will create the
5328     correct structure needed as input for the allocator.
5329
5330     The checks for the completeness of the opcode must have already been
5331     done.
5332
5333     """
5334     data = self.in_data
5335     if len(self.disks) != 2:
5336       raise errors.OpExecError("Only two-disk configurations supported")
5337
5338     disk_space = _ComputeDiskSize(self.disk_template,
5339                                   self.disks[0]["size"], self.disks[1]["size"])
5340
5341     if self.disk_template in constants.DTS_NET_MIRROR:
5342       self.required_nodes = 2
5343     else:
5344       self.required_nodes = 1
5345     request = {
5346       "type": "allocate",
5347       "name": self.name,
5348       "disk_template": self.disk_template,
5349       "tags": self.tags,
5350       "os": self.os,
5351       "vcpus": self.vcpus,
5352       "memory": self.mem_size,
5353       "disks": self.disks,
5354       "disk_space_total": disk_space,
5355       "nics": self.nics,
5356       "required_nodes": self.required_nodes,
5357       }
5358     data["request"] = request
5359
5360   def _AddRelocateInstance(self):
5361     """Add relocate instance data to allocator structure.
5362
5363     This in combination with _IAllocatorGetClusterData will create the
5364     correct structure needed as input for the allocator.
5365
5366     The checks for the completeness of the opcode must have already been
5367     done.
5368
5369     """
5370     instance = self.cfg.GetInstanceInfo(self.name)
5371     if instance is None:
5372       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5373                                    " IAllocator" % self.name)
5374
5375     if instance.disk_template not in constants.DTS_NET_MIRROR:
5376       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5377
5378     if len(instance.secondary_nodes) != 1:
5379       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5380
5381     self.required_nodes = 1
5382
5383     disk_space = _ComputeDiskSize(instance.disk_template,
5384                                   instance.disks[0].size,
5385                                   instance.disks[1].size)
5386
5387     request = {
5388       "type": "relocate",
5389       "name": self.name,
5390       "disk_space_total": disk_space,
5391       "required_nodes": self.required_nodes,
5392       "relocate_from": self.relocate_from,
5393       }
5394     self.in_data["request"] = request
5395
5396   def _BuildInputData(self):
5397     """Build input data structures.
5398
5399     """
5400     self._ComputeClusterData()
5401
5402     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5403       self._AddNewInstance()
5404     else:
5405       self._AddRelocateInstance()
5406
5407     self.in_text = serializer.Dump(self.in_data)
5408
5409   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5410     """Run an instance allocator and return the results.
5411
5412     """
5413     data = self.in_text
5414
5415     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5416
5417     if not isinstance(result, tuple) or len(result) != 4:
5418       raise errors.OpExecError("Invalid result from master iallocator runner")
5419
5420     rcode, stdout, stderr, fail = result
5421
5422     if rcode == constants.IARUN_NOTFOUND:
5423       raise errors.OpExecError("Can't find allocator '%s'" % name)
5424     elif rcode == constants.IARUN_FAILURE:
5425         raise errors.OpExecError("Instance allocator call failed: %s,"
5426                                  " output: %s" %
5427                                  (fail, stdout+stderr))
5428     self.out_text = stdout
5429     if validate:
5430       self._ValidateResult()
5431
5432   def _ValidateResult(self):
5433     """Process the allocator results.
5434
5435     This will process and if successful save the result in
5436     self.out_data and the other parameters.
5437
5438     """
5439     try:
5440       rdict = serializer.Load(self.out_text)
5441     except Exception, err:
5442       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5443
5444     if not isinstance(rdict, dict):
5445       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5446
5447     for key in "success", "info", "nodes":
5448       if key not in rdict:
5449         raise errors.OpExecError("Can't parse iallocator results:"
5450                                  " missing key '%s'" % key)
5451       setattr(self, key, rdict[key])
5452
5453     if not isinstance(rdict["nodes"], list):
5454       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5455                                " is not a list")
5456     self.out_data = rdict
5457
5458
5459 class LUTestAllocator(NoHooksLU):
5460   """Run allocator tests.
5461
5462   This LU runs the allocator tests
5463
5464   """
5465   _OP_REQP = ["direction", "mode", "name"]
5466
5467   def CheckPrereq(self):
5468     """Check prerequisites.
5469
5470     This checks the opcode parameters depending on the director and mode test.
5471
5472     """
5473     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5474       for attr in ["name", "mem_size", "disks", "disk_template",
5475                    "os", "tags", "nics", "vcpus"]:
5476         if not hasattr(self.op, attr):
5477           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5478                                      attr)
5479       iname = self.cfg.ExpandInstanceName(self.op.name)
5480       if iname is not None:
5481         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5482                                    iname)
5483       if not isinstance(self.op.nics, list):
5484         raise errors.OpPrereqError("Invalid parameter 'nics'")
5485       for row in self.op.nics:
5486         if (not isinstance(row, dict) or
5487             "mac" not in row or
5488             "ip" not in row or
5489             "bridge" not in row):
5490           raise errors.OpPrereqError("Invalid contents of the"
5491                                      " 'nics' parameter")
5492       if not isinstance(self.op.disks, list):
5493         raise errors.OpPrereqError("Invalid parameter 'disks'")
5494       if len(self.op.disks) != 2:
5495         raise errors.OpPrereqError("Only two-disk configurations supported")
5496       for row in self.op.disks:
5497         if (not isinstance(row, dict) or
5498             "size" not in row or
5499             not isinstance(row["size"], int) or
5500             "mode" not in row or
5501             row["mode"] not in ['r', 'w']):
5502           raise errors.OpPrereqError("Invalid contents of the"
5503                                      " 'disks' parameter")
5504     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5505       if not hasattr(self.op, "name"):
5506         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5507       fname = self.cfg.ExpandInstanceName(self.op.name)
5508       if fname is None:
5509         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5510                                    self.op.name)
5511       self.op.name = fname
5512       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5513     else:
5514       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5515                                  self.op.mode)
5516
5517     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5518       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5519         raise errors.OpPrereqError("Missing allocator name")
5520     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5521       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5522                                  self.op.direction)
5523
5524   def Exec(self, feedback_fn):
5525     """Run the allocator test.
5526
5527     """
5528     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5529       ial = IAllocator(self.cfg, self.sstore,
5530                        mode=self.op.mode,
5531                        name=self.op.name,
5532                        mem_size=self.op.mem_size,
5533                        disks=self.op.disks,
5534                        disk_template=self.op.disk_template,
5535                        os=self.op.os,
5536                        tags=self.op.tags,
5537                        nics=self.op.nics,
5538                        vcpus=self.op.vcpus,
5539                        )
5540     else:
5541       ial = IAllocator(self.cfg, self.sstore,
5542                        mode=self.op.mode,
5543                        name=self.op.name,
5544                        relocate_from=list(self.relocate_from),
5545                        )
5546
5547     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5548       result = ial.in_text
5549     else:
5550       ial.Run(self.op.allocator, validate=False)
5551       result = ial.out_text
5552     return result