grow-disk: wait until resync is completed
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     tags = self.cfg.GetClusterInfo().GetTags()
862     # TODO: populate the environment with useful information for verify hooks
863     env = {
864       "CLUSTER_TAGS": " ".join(tags),
865       }
866     return env, [], all_nodes
867
868   def Exec(self, feedback_fn):
869     """Verify integrity of cluster, performing various test on nodes.
870
871     """
872     bad = False
873     feedback_fn("* Verifying global settings")
874     for msg in self.cfg.VerifyConfig():
875       feedback_fn("  - ERROR: %s" % msg)
876
877     vg_name = self.cfg.GetVGName()
878     nodelist = utils.NiceSort(self.cfg.GetNodeList())
879     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881     i_non_redundant = [] # Non redundant instances
882     node_volume = {}
883     node_instance = {}
884     node_info = {}
885     instance_cfg = {}
886
887     # FIXME: verify OS list
888     # do local checksums
889     file_names = list(self.sstore.GetFileList())
890     file_names.append(constants.SSL_CERT_FILE)
891     file_names.append(constants.CLUSTER_CONF_FILE)
892     local_checksums = utils.FingerprintFiles(file_names)
893
894     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896     all_instanceinfo = rpc.call_instance_list(nodelist)
897     all_vglist = rpc.call_vg_list(nodelist)
898     node_verify_param = {
899       'filelist': file_names,
900       'nodelist': nodelist,
901       'hypervisor': None,
902       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903                         for node in nodeinfo]
904       }
905     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906     all_rversion = rpc.call_version(nodelist)
907     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
908
909     incomplete_nodeinfo = False
910
911     for node in nodelist:
912       feedback_fn("* Verifying node %s" % node)
913       result = self._VerifyNode(node, file_names, local_checksums,
914                                 all_vglist[node], all_nvinfo[node],
915                                 all_rversion[node], feedback_fn)
916       bad = bad or result
917
918       # node_volume
919       volumeinfo = all_volumeinfo[node]
920
921       if isinstance(volumeinfo, basestring):
922         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
923                     (node, volumeinfo[-400:].encode('string_escape')))
924         bad = True
925         node_volume[node] = {}
926       elif not isinstance(volumeinfo, dict):
927         feedback_fn("  - ERROR: connection to %s failed" % (node,))
928         bad = True
929         incomplete_nodeinfo = True
930         continue
931       else:
932         node_volume[node] = volumeinfo
933
934       # node_instance
935       nodeinstance = all_instanceinfo[node]
936       if type(nodeinstance) != list:
937         feedback_fn("  - ERROR: connection to %s failed" % (node,))
938         bad = True
939         incomplete_nodeinfo = True
940         continue
941
942       node_instance[node] = nodeinstance
943
944       # node_info
945       nodeinfo = all_ninfo[node]
946       if not isinstance(nodeinfo, dict):
947         feedback_fn("  - ERROR: connection to %s failed" % (node,))
948         bad = True
949         incomplete_nodeinfo = True
950         continue
951
952       try:
953         node_info[node] = {
954           "mfree": int(nodeinfo['memory_free']),
955           "dfree": int(nodeinfo['vg_free']),
956           "pinst": [],
957           "sinst": [],
958           # dictionary holding all instances this node is secondary for,
959           # grouped by their primary node. Each key is a cluster node, and each
960           # value is a list of instances which have the key as primary and the
961           # current node as secondary.  this is handy to calculate N+1 memory
962           # availability if you can only failover from a primary to its
963           # secondary.
964           "sinst-by-pnode": {},
965         }
966       except (ValueError, TypeError):
967         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
968         bad = True
969         incomplete_nodeinfo = True
970         continue
971
972     node_vol_should = {}
973
974     for instance in instancelist:
975       feedback_fn("* Verifying instance %s" % instance)
976       inst_config = self.cfg.GetInstanceInfo(instance)
977       result =  self._VerifyInstance(instance, inst_config, node_volume,
978                                      node_instance, feedback_fn)
979       bad = bad or result
980
981       inst_config.MapLVsByNode(node_vol_should)
982
983       instance_cfg[instance] = inst_config
984
985       pnode = inst_config.primary_node
986       if pnode in node_info:
987         node_info[pnode]['pinst'].append(instance)
988       else:
989         feedback_fn("  - ERROR: instance %s, connection to primary node"
990                     " %s failed" % (instance, pnode))
991         bad = True
992
993       # If the instance is non-redundant we cannot survive losing its primary
994       # node, so we are not N+1 compliant. On the other hand we have no disk
995       # templates with more than one secondary so that situation is not well
996       # supported either.
997       # FIXME: does not support file-backed instances
998       if len(inst_config.secondary_nodes) == 0:
999         i_non_redundant.append(instance)
1000       elif len(inst_config.secondary_nodes) > 1:
1001         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1002                     % instance)
1003
1004       for snode in inst_config.secondary_nodes:
1005         if snode in node_info:
1006           node_info[snode]['sinst'].append(instance)
1007           if pnode not in node_info[snode]['sinst-by-pnode']:
1008             node_info[snode]['sinst-by-pnode'][pnode] = []
1009           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1010         else:
1011           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1012                       " %s failed" % (instance, snode))
1013
1014     feedback_fn("* Verifying orphan volumes")
1015     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1016                                        feedback_fn)
1017     bad = bad or result
1018
1019     feedback_fn("* Verifying remaining instances")
1020     result = self._VerifyOrphanInstances(instancelist, node_instance,
1021                                          feedback_fn)
1022     bad = bad or result
1023
1024     if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025         not incomplete_nodeinfo):
1026       feedback_fn("* Verifying N+1 Memory redundancy")
1027       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1028       bad = bad or result
1029
1030     feedback_fn("* Other Notes")
1031     if i_non_redundant:
1032       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1033                   % len(i_non_redundant))
1034
1035     return int(bad)
1036
1037   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038     """Analize the post-hooks' result, handle it, and send some
1039     nicely-formatted feedback back to the user.
1040
1041     Args:
1042       phase: the hooks phase that has just been run
1043       hooks_results: the results of the multi-node hooks rpc call
1044       feedback_fn: function to send feedback back to the caller
1045       lu_result: previous Exec result
1046
1047     """
1048     # We only really run POST phase hooks, and are only interested in their results
1049     if phase == constants.HOOKS_PHASE_POST:
1050       # Used to change hooks' output to proper indentation
1051       indent_re = re.compile('^', re.M)
1052       feedback_fn("* Hooks Results")
1053       if not hooks_results:
1054         feedback_fn("  - ERROR: general communication failure")
1055         lu_result = 1
1056       else:
1057         for node_name in hooks_results:
1058           show_node_header = True
1059           res = hooks_results[node_name]
1060           if res is False or not isinstance(res, list):
1061             feedback_fn("    Communication failure")
1062             lu_result = 1
1063             continue
1064           for script, hkr, output in res:
1065             if hkr == constants.HKR_FAIL:
1066               # The node header is only shown once, if there are
1067               # failing hooks on that node
1068               if show_node_header:
1069                 feedback_fn("  Node %s:" % node_name)
1070                 show_node_header = False
1071               feedback_fn("    ERROR: Script %s failed, output:" % script)
1072               output = indent_re.sub('      ', output)
1073               feedback_fn("%s" % output)
1074               lu_result = 1
1075
1076       return lu_result
1077
1078
1079 class LUVerifyDisks(NoHooksLU):
1080   """Verifies the cluster disks status.
1081
1082   """
1083   _OP_REQP = []
1084
1085   def CheckPrereq(self):
1086     """Check prerequisites.
1087
1088     This has no prerequisites.
1089
1090     """
1091     pass
1092
1093   def Exec(self, feedback_fn):
1094     """Verify integrity of cluster disks.
1095
1096     """
1097     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1098
1099     vg_name = self.cfg.GetVGName()
1100     nodes = utils.NiceSort(self.cfg.GetNodeList())
1101     instances = [self.cfg.GetInstanceInfo(name)
1102                  for name in self.cfg.GetInstanceList()]
1103
1104     nv_dict = {}
1105     for inst in instances:
1106       inst_lvs = {}
1107       if (inst.status != "up" or
1108           inst.disk_template not in constants.DTS_NET_MIRROR):
1109         continue
1110       inst.MapLVsByNode(inst_lvs)
1111       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112       for node, vol_list in inst_lvs.iteritems():
1113         for vol in vol_list:
1114           nv_dict[(node, vol)] = inst
1115
1116     if not nv_dict:
1117       return result
1118
1119     node_lvs = rpc.call_volume_list(nodes, vg_name)
1120
1121     to_act = set()
1122     for node in nodes:
1123       # node_volume
1124       lvs = node_lvs[node]
1125
1126       if isinstance(lvs, basestring):
1127         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128         res_nlvm[node] = lvs
1129       elif not isinstance(lvs, dict):
1130         logger.Info("connection to node %s failed or invalid data returned" %
1131                     (node,))
1132         res_nodes.append(node)
1133         continue
1134
1135       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136         inst = nv_dict.pop((node, lv_name), None)
1137         if (not lv_online and inst is not None
1138             and inst.name not in res_instances):
1139           res_instances.append(inst.name)
1140
1141     # any leftover items in nv_dict are missing LVs, let's arrange the
1142     # data better
1143     for key, inst in nv_dict.iteritems():
1144       if inst.name not in res_missing:
1145         res_missing[inst.name] = []
1146       res_missing[inst.name].append(key)
1147
1148     return result
1149
1150
1151 class LURenameCluster(LogicalUnit):
1152   """Rename the cluster.
1153
1154   """
1155   HPATH = "cluster-rename"
1156   HTYPE = constants.HTYPE_CLUSTER
1157   _OP_REQP = ["name"]
1158
1159   def BuildHooksEnv(self):
1160     """Build hooks env.
1161
1162     """
1163     env = {
1164       "OP_TARGET": self.sstore.GetClusterName(),
1165       "NEW_NAME": self.op.name,
1166       }
1167     mn = self.sstore.GetMasterNode()
1168     return env, [mn], [mn]
1169
1170   def CheckPrereq(self):
1171     """Verify that the passed name is a valid one.
1172
1173     """
1174     hostname = utils.HostInfo(self.op.name)
1175
1176     new_name = hostname.name
1177     self.ip = new_ip = hostname.ip
1178     old_name = self.sstore.GetClusterName()
1179     old_ip = self.sstore.GetMasterIP()
1180     if new_name == old_name and new_ip == old_ip:
1181       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182                                  " cluster has changed")
1183     if new_ip != old_ip:
1184       result = utils.RunCmd(["fping", "-q", new_ip])
1185       if not result.failed:
1186         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1187                                    " reachable on the network. Aborting." %
1188                                    new_ip)
1189
1190     self.op.name = new_name
1191
1192   def Exec(self, feedback_fn):
1193     """Rename the cluster.
1194
1195     """
1196     clustername = self.op.name
1197     ip = self.ip
1198     ss = self.sstore
1199
1200     # shutdown the master IP
1201     master = ss.GetMasterNode()
1202     if not rpc.call_node_stop_master(master):
1203       raise errors.OpExecError("Could not disable the master role")
1204
1205     try:
1206       # modify the sstore
1207       ss.SetKey(ss.SS_MASTER_IP, ip)
1208       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1209
1210       # Distribute updated ss config to all nodes
1211       myself = self.cfg.GetNodeInfo(master)
1212       dist_nodes = self.cfg.GetNodeList()
1213       if myself.name in dist_nodes:
1214         dist_nodes.remove(myself.name)
1215
1216       logger.Debug("Copying updated ssconf data to all nodes")
1217       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1218         fname = ss.KeyToFilename(keyname)
1219         result = rpc.call_upload_file(dist_nodes, fname)
1220         for to_node in dist_nodes:
1221           if not result[to_node]:
1222             logger.Error("copy of file %s to node %s failed" %
1223                          (fname, to_node))
1224     finally:
1225       if not rpc.call_node_start_master(master):
1226         logger.Error("Could not re-enable the master role on the master,"
1227                      " please restart manually.")
1228
1229
1230 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1231   """Sleep and poll for an instance's disk to sync.
1232
1233   """
1234   if not instance.disks:
1235     return True
1236
1237   if not oneshot:
1238     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1239
1240   node = instance.primary_node
1241
1242   for dev in instance.disks:
1243     cfgw.SetDiskID(dev, node)
1244
1245   retries = 0
1246   while True:
1247     max_time = 0
1248     done = True
1249     cumul_degraded = False
1250     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1251     if not rstats:
1252       proc.LogWarning("Can't get any data from node %s" % node)
1253       retries += 1
1254       if retries >= 10:
1255         raise errors.RemoteError("Can't contact node %s for mirror data,"
1256                                  " aborting." % node)
1257       time.sleep(6)
1258       continue
1259     retries = 0
1260     for i in range(len(rstats)):
1261       mstat = rstats[i]
1262       if mstat is None:
1263         proc.LogWarning("Can't compute data for node %s/%s" %
1264                         (node, instance.disks[i].iv_name))
1265         continue
1266       # we ignore the ldisk parameter
1267       perc_done, est_time, is_degraded, _ = mstat
1268       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1269       if perc_done is not None:
1270         done = False
1271         if est_time is not None:
1272           rem_time = "%d estimated seconds remaining" % est_time
1273           max_time = est_time
1274         else:
1275           rem_time = "no time estimate"
1276         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1277                      (instance.disks[i].iv_name, perc_done, rem_time))
1278     if done or oneshot:
1279       break
1280
1281     if unlock:
1282       utils.Unlock('cmd')
1283     try:
1284       time.sleep(min(60, max_time))
1285     finally:
1286       if unlock:
1287         utils.Lock('cmd')
1288
1289   if done:
1290     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1291   return not cumul_degraded
1292
1293
1294 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1295   """Check that mirrors are not degraded.
1296
1297   The ldisk parameter, if True, will change the test from the
1298   is_degraded attribute (which represents overall non-ok status for
1299   the device(s)) to the ldisk (representing the local storage status).
1300
1301   """
1302   cfgw.SetDiskID(dev, node)
1303   if ldisk:
1304     idx = 6
1305   else:
1306     idx = 5
1307
1308   result = True
1309   if on_primary or dev.AssembleOnSecondary():
1310     rstats = rpc.call_blockdev_find(node, dev)
1311     if not rstats:
1312       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1313       result = False
1314     else:
1315       result = result and (not rstats[idx])
1316   if dev.children:
1317     for child in dev.children:
1318       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1319
1320   return result
1321
1322
1323 class LUDiagnoseOS(NoHooksLU):
1324   """Logical unit for OS diagnose/query.
1325
1326   """
1327   _OP_REQP = ["output_fields", "names"]
1328
1329   def CheckPrereq(self):
1330     """Check prerequisites.
1331
1332     This always succeeds, since this is a pure query LU.
1333
1334     """
1335     if self.op.names:
1336       raise errors.OpPrereqError("Selective OS query not supported")
1337
1338     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1339     _CheckOutputFields(static=[],
1340                        dynamic=self.dynamic_fields,
1341                        selected=self.op.output_fields)
1342
1343   @staticmethod
1344   def _DiagnoseByOS(node_list, rlist):
1345     """Remaps a per-node return list into an a per-os per-node dictionary
1346
1347       Args:
1348         node_list: a list with the names of all nodes
1349         rlist: a map with node names as keys and OS objects as values
1350
1351       Returns:
1352         map: a map with osnames as keys and as value another map, with
1353              nodes as
1354              keys and list of OS objects as values
1355              e.g. {"debian-etch": {"node1": [<object>,...],
1356                                    "node2": [<object>,]}
1357                   }
1358
1359     """
1360     all_os = {}
1361     for node_name, nr in rlist.iteritems():
1362       if not nr:
1363         continue
1364       for os in nr:
1365         if os.name not in all_os:
1366           # build a list of nodes for this os containing empty lists
1367           # for each node in node_list
1368           all_os[os.name] = {}
1369           for nname in node_list:
1370             all_os[os.name][nname] = []
1371         all_os[os.name][node_name].append(os)
1372     return all_os
1373
1374   def Exec(self, feedback_fn):
1375     """Compute the list of OSes.
1376
1377     """
1378     node_list = self.cfg.GetNodeList()
1379     node_data = rpc.call_os_diagnose(node_list)
1380     if node_data == False:
1381       raise errors.OpExecError("Can't gather the list of OSes")
1382     pol = self._DiagnoseByOS(node_list, node_data)
1383     output = []
1384     for os_name, os_data in pol.iteritems():
1385       row = []
1386       for field in self.op.output_fields:
1387         if field == "name":
1388           val = os_name
1389         elif field == "valid":
1390           val = utils.all([osl and osl[0] for osl in os_data.values()])
1391         elif field == "node_status":
1392           val = {}
1393           for node_name, nos_list in os_data.iteritems():
1394             val[node_name] = [(v.status, v.path) for v in nos_list]
1395         else:
1396           raise errors.ParameterError(field)
1397         row.append(val)
1398       output.append(row)
1399
1400     return output
1401
1402
1403 class LURemoveNode(LogicalUnit):
1404   """Logical unit for removing a node.
1405
1406   """
1407   HPATH = "node-remove"
1408   HTYPE = constants.HTYPE_NODE
1409   _OP_REQP = ["node_name"]
1410
1411   def BuildHooksEnv(self):
1412     """Build hooks env.
1413
1414     This doesn't run on the target node in the pre phase as a failed
1415     node would not allows itself to run.
1416
1417     """
1418     env = {
1419       "OP_TARGET": self.op.node_name,
1420       "NODE_NAME": self.op.node_name,
1421       }
1422     all_nodes = self.cfg.GetNodeList()
1423     all_nodes.remove(self.op.node_name)
1424     return env, all_nodes, all_nodes
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks:
1430      - the node exists in the configuration
1431      - it does not have primary or secondary instances
1432      - it's not the master
1433
1434     Any errors are signalled by raising errors.OpPrereqError.
1435
1436     """
1437     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438     if node is None:
1439       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440
1441     instance_list = self.cfg.GetInstanceList()
1442
1443     masternode = self.sstore.GetMasterNode()
1444     if node.name == masternode:
1445       raise errors.OpPrereqError("Node is the master node,"
1446                                  " you need to failover first.")
1447
1448     for instance_name in instance_list:
1449       instance = self.cfg.GetInstanceInfo(instance_name)
1450       if node.name == instance.primary_node:
1451         raise errors.OpPrereqError("Instance %s still running on the node,"
1452                                    " please remove first." % instance_name)
1453       if node.name in instance.secondary_nodes:
1454         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1455                                    " please remove first." % instance_name)
1456     self.op.node_name = node.name
1457     self.node = node
1458
1459   def Exec(self, feedback_fn):
1460     """Removes the node from the cluster.
1461
1462     """
1463     node = self.node
1464     logger.Info("stopping the node daemon and removing configs from node %s" %
1465                 node.name)
1466
1467     rpc.call_node_leave_cluster(node.name)
1468
1469     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1470
1471     logger.Info("Removing node %s from config" % node.name)
1472
1473     self.cfg.RemoveNode(node.name)
1474
1475     _RemoveHostFromEtcHosts(node.name)
1476
1477
1478 class LUQueryNodes(NoHooksLU):
1479   """Logical unit for querying nodes.
1480
1481   """
1482   _OP_REQP = ["output_fields", "names"]
1483
1484   def CheckPrereq(self):
1485     """Check prerequisites.
1486
1487     This checks that the fields required are valid output fields.
1488
1489     """
1490     self.dynamic_fields = frozenset([
1491       "dtotal", "dfree",
1492       "mtotal", "mnode", "mfree",
1493       "bootid",
1494       "ctotal",
1495       ])
1496
1497     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1498                                "pinst_list", "sinst_list",
1499                                "pip", "sip", "tags"],
1500                        dynamic=self.dynamic_fields,
1501                        selected=self.op.output_fields)
1502
1503     self.wanted = _GetWantedNodes(self, self.op.names)
1504
1505   def Exec(self, feedback_fn):
1506     """Computes the list of nodes and their attributes.
1507
1508     """
1509     nodenames = self.wanted
1510     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1511
1512     # begin data gathering
1513
1514     if self.dynamic_fields.intersection(self.op.output_fields):
1515       live_data = {}
1516       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1517       for name in nodenames:
1518         nodeinfo = node_data.get(name, None)
1519         if nodeinfo:
1520           live_data[name] = {
1521             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1522             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1523             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1524             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1525             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1526             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1527             "bootid": nodeinfo['bootid'],
1528             }
1529         else:
1530           live_data[name] = {}
1531     else:
1532       live_data = dict.fromkeys(nodenames, {})
1533
1534     node_to_primary = dict([(name, set()) for name in nodenames])
1535     node_to_secondary = dict([(name, set()) for name in nodenames])
1536
1537     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1538                              "sinst_cnt", "sinst_list"))
1539     if inst_fields & frozenset(self.op.output_fields):
1540       instancelist = self.cfg.GetInstanceList()
1541
1542       for instance_name in instancelist:
1543         inst = self.cfg.GetInstanceInfo(instance_name)
1544         if inst.primary_node in node_to_primary:
1545           node_to_primary[inst.primary_node].add(inst.name)
1546         for secnode in inst.secondary_nodes:
1547           if secnode in node_to_secondary:
1548             node_to_secondary[secnode].add(inst.name)
1549
1550     # end data gathering
1551
1552     output = []
1553     for node in nodelist:
1554       node_output = []
1555       for field in self.op.output_fields:
1556         if field == "name":
1557           val = node.name
1558         elif field == "pinst_list":
1559           val = list(node_to_primary[node.name])
1560         elif field == "sinst_list":
1561           val = list(node_to_secondary[node.name])
1562         elif field == "pinst_cnt":
1563           val = len(node_to_primary[node.name])
1564         elif field == "sinst_cnt":
1565           val = len(node_to_secondary[node.name])
1566         elif field == "pip":
1567           val = node.primary_ip
1568         elif field == "sip":
1569           val = node.secondary_ip
1570         elif field == "tags":
1571           val = list(node.GetTags())
1572         elif field in self.dynamic_fields:
1573           val = live_data[node.name].get(field, None)
1574         else:
1575           raise errors.ParameterError(field)
1576         node_output.append(val)
1577       output.append(node_output)
1578
1579     return output
1580
1581
1582 class LUQueryNodeVolumes(NoHooksLU):
1583   """Logical unit for getting volumes on node(s).
1584
1585   """
1586   _OP_REQP = ["nodes", "output_fields"]
1587
1588   def CheckPrereq(self):
1589     """Check prerequisites.
1590
1591     This checks that the fields required are valid output fields.
1592
1593     """
1594     self.nodes = _GetWantedNodes(self, self.op.nodes)
1595
1596     _CheckOutputFields(static=["node"],
1597                        dynamic=["phys", "vg", "name", "size", "instance"],
1598                        selected=self.op.output_fields)
1599
1600
1601   def Exec(self, feedback_fn):
1602     """Computes the list of nodes and their attributes.
1603
1604     """
1605     nodenames = self.nodes
1606     volumes = rpc.call_node_volumes(nodenames)
1607
1608     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1609              in self.cfg.GetInstanceList()]
1610
1611     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1612
1613     output = []
1614     for node in nodenames:
1615       if node not in volumes or not volumes[node]:
1616         continue
1617
1618       node_vols = volumes[node][:]
1619       node_vols.sort(key=lambda vol: vol['dev'])
1620
1621       for vol in node_vols:
1622         node_output = []
1623         for field in self.op.output_fields:
1624           if field == "node":
1625             val = node
1626           elif field == "phys":
1627             val = vol['dev']
1628           elif field == "vg":
1629             val = vol['vg']
1630           elif field == "name":
1631             val = vol['name']
1632           elif field == "size":
1633             val = int(float(vol['size']))
1634           elif field == "instance":
1635             for inst in ilist:
1636               if node not in lv_by_node[inst]:
1637                 continue
1638               if vol['name'] in lv_by_node[inst][node]:
1639                 val = inst.name
1640                 break
1641             else:
1642               val = '-'
1643           else:
1644             raise errors.ParameterError(field)
1645           node_output.append(str(val))
1646
1647         output.append(node_output)
1648
1649     return output
1650
1651
1652 class LUAddNode(LogicalUnit):
1653   """Logical unit for adding node to the cluster.
1654
1655   """
1656   HPATH = "node-add"
1657   HTYPE = constants.HTYPE_NODE
1658   _OP_REQP = ["node_name"]
1659
1660   def BuildHooksEnv(self):
1661     """Build hooks env.
1662
1663     This will run on all nodes before, and on all nodes + the new node after.
1664
1665     """
1666     env = {
1667       "OP_TARGET": self.op.node_name,
1668       "NODE_NAME": self.op.node_name,
1669       "NODE_PIP": self.op.primary_ip,
1670       "NODE_SIP": self.op.secondary_ip,
1671       }
1672     nodes_0 = self.cfg.GetNodeList()
1673     nodes_1 = nodes_0 + [self.op.node_name, ]
1674     return env, nodes_0, nodes_1
1675
1676   def CheckPrereq(self):
1677     """Check prerequisites.
1678
1679     This checks:
1680      - the new node is not already in the config
1681      - it is resolvable
1682      - its parameters (single/dual homed) matches the cluster
1683
1684     Any errors are signalled by raising errors.OpPrereqError.
1685
1686     """
1687     node_name = self.op.node_name
1688     cfg = self.cfg
1689
1690     dns_data = utils.HostInfo(node_name)
1691
1692     node = dns_data.name
1693     primary_ip = self.op.primary_ip = dns_data.ip
1694     secondary_ip = getattr(self.op, "secondary_ip", None)
1695     if secondary_ip is None:
1696       secondary_ip = primary_ip
1697     if not utils.IsValidIP(secondary_ip):
1698       raise errors.OpPrereqError("Invalid secondary IP given")
1699     self.op.secondary_ip = secondary_ip
1700
1701     node_list = cfg.GetNodeList()
1702     if not self.op.readd and node in node_list:
1703       raise errors.OpPrereqError("Node %s is already in the configuration" %
1704                                  node)
1705     elif self.op.readd and node not in node_list:
1706       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1707
1708     for existing_node_name in node_list:
1709       existing_node = cfg.GetNodeInfo(existing_node_name)
1710
1711       if self.op.readd and node == existing_node_name:
1712         if (existing_node.primary_ip != primary_ip or
1713             existing_node.secondary_ip != secondary_ip):
1714           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1715                                      " address configuration as before")
1716         continue
1717
1718       if (existing_node.primary_ip == primary_ip or
1719           existing_node.secondary_ip == primary_ip or
1720           existing_node.primary_ip == secondary_ip or
1721           existing_node.secondary_ip == secondary_ip):
1722         raise errors.OpPrereqError("New node ip address(es) conflict with"
1723                                    " existing node %s" % existing_node.name)
1724
1725     # check that the type of the node (single versus dual homed) is the
1726     # same as for the master
1727     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1728     master_singlehomed = myself.secondary_ip == myself.primary_ip
1729     newbie_singlehomed = secondary_ip == primary_ip
1730     if master_singlehomed != newbie_singlehomed:
1731       if master_singlehomed:
1732         raise errors.OpPrereqError("The master has no private ip but the"
1733                                    " new node has one")
1734       else:
1735         raise errors.OpPrereqError("The master has a private ip but the"
1736                                    " new node doesn't have one")
1737
1738     # checks reachablity
1739     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1740       raise errors.OpPrereqError("Node not reachable by ping")
1741
1742     if not newbie_singlehomed:
1743       # check reachability from my secondary ip to newbie's secondary ip
1744       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1745                            source=myself.secondary_ip):
1746         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1747                                    " based ping to noded port")
1748
1749     self.new_node = objects.Node(name=node,
1750                                  primary_ip=primary_ip,
1751                                  secondary_ip=secondary_ip)
1752
1753     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1754       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1755         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1756                                    constants.VNC_PASSWORD_FILE)
1757
1758   def Exec(self, feedback_fn):
1759     """Adds the new node to the cluster.
1760
1761     """
1762     new_node = self.new_node
1763     node = new_node.name
1764
1765     # set up inter-node password and certificate and restarts the node daemon
1766     gntpass = self.sstore.GetNodeDaemonPassword()
1767     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1768       raise errors.OpExecError("ganeti password corruption detected")
1769     f = open(constants.SSL_CERT_FILE)
1770     try:
1771       gntpem = f.read(8192)
1772     finally:
1773       f.close()
1774     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1775     # so we use this to detect an invalid certificate; as long as the
1776     # cert doesn't contain this, the here-document will be correctly
1777     # parsed by the shell sequence below
1778     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1779       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1780     if not gntpem.endswith("\n"):
1781       raise errors.OpExecError("PEM must end with newline")
1782     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1783
1784     # and then connect with ssh to set password and start ganeti-noded
1785     # note that all the below variables are sanitized at this point,
1786     # either by being constants or by the checks above
1787     ss = self.sstore
1788     mycommand = ("umask 077 && "
1789                  "echo '%s' > '%s' && "
1790                  "cat > '%s' << '!EOF.' && \n"
1791                  "%s!EOF.\n%s restart" %
1792                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1793                   constants.SSL_CERT_FILE, gntpem,
1794                   constants.NODE_INITD_SCRIPT))
1795
1796     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1797     if result.failed:
1798       raise errors.OpExecError("Remote command on node %s, error: %s,"
1799                                " output: %s" %
1800                                (node, result.fail_reason, result.output))
1801
1802     # check connectivity
1803     time.sleep(4)
1804
1805     result = rpc.call_version([node])[node]
1806     if result:
1807       if constants.PROTOCOL_VERSION == result:
1808         logger.Info("communication to node %s fine, sw version %s match" %
1809                     (node, result))
1810       else:
1811         raise errors.OpExecError("Version mismatch master version %s,"
1812                                  " node version %s" %
1813                                  (constants.PROTOCOL_VERSION, result))
1814     else:
1815       raise errors.OpExecError("Cannot get version from the new node")
1816
1817     # setup ssh on node
1818     logger.Info("copy ssh key to node %s" % node)
1819     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1820     keyarray = []
1821     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1822                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1823                 priv_key, pub_key]
1824
1825     for i in keyfiles:
1826       f = open(i, 'r')
1827       try:
1828         keyarray.append(f.read())
1829       finally:
1830         f.close()
1831
1832     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1833                                keyarray[3], keyarray[4], keyarray[5])
1834
1835     if not result:
1836       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1837
1838     # Add node to our /etc/hosts, and add key to known_hosts
1839     _AddHostToEtcHosts(new_node.name)
1840
1841     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1842                       self.cfg.GetHostKey())
1843
1844     if new_node.secondary_ip != new_node.primary_ip:
1845       if not rpc.call_node_tcp_ping(new_node.name,
1846                                     constants.LOCALHOST_IP_ADDRESS,
1847                                     new_node.secondary_ip,
1848                                     constants.DEFAULT_NODED_PORT,
1849                                     10, False):
1850         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1851                                  " you gave (%s). Please fix and re-run this"
1852                                  " command." % new_node.secondary_ip)
1853
1854     success, msg = ssh.VerifyNodeHostname(node)
1855     if not success:
1856       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1857                                " than the one the resolver gives: %s."
1858                                " Please fix and re-run this command." %
1859                                (node, msg))
1860
1861     # Distribute updated /etc/hosts and known_hosts to all nodes,
1862     # including the node just added
1863     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1864     dist_nodes = self.cfg.GetNodeList()
1865     if not self.op.readd:
1866       dist_nodes.append(node)
1867     if myself.name in dist_nodes:
1868       dist_nodes.remove(myself.name)
1869
1870     logger.Debug("Copying hosts and known_hosts to all nodes")
1871     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872       result = rpc.call_upload_file(dist_nodes, fname)
1873       for to_node in dist_nodes:
1874         if not result[to_node]:
1875           logger.Error("copy of file %s to node %s failed" %
1876                        (fname, to_node))
1877
1878     to_copy = ss.GetFileList()
1879     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1880       to_copy.append(constants.VNC_PASSWORD_FILE)
1881     for fname in to_copy:
1882       if not ssh.CopyFileToNode(node, fname):
1883         logger.Error("could not copy file %s to node %s" % (fname, node))
1884
1885     if not self.op.readd:
1886       logger.Info("adding node %s to cluster.conf" % node)
1887       self.cfg.AddNode(new_node)
1888
1889
1890 class LUMasterFailover(LogicalUnit):
1891   """Failover the master node to the current node.
1892
1893   This is a special LU in that it must run on a non-master node.
1894
1895   """
1896   HPATH = "master-failover"
1897   HTYPE = constants.HTYPE_CLUSTER
1898   REQ_MASTER = False
1899   _OP_REQP = []
1900
1901   def BuildHooksEnv(self):
1902     """Build hooks env.
1903
1904     This will run on the new master only in the pre phase, and on all
1905     the nodes in the post phase.
1906
1907     """
1908     env = {
1909       "OP_TARGET": self.new_master,
1910       "NEW_MASTER": self.new_master,
1911       "OLD_MASTER": self.old_master,
1912       }
1913     return env, [self.new_master], self.cfg.GetNodeList()
1914
1915   def CheckPrereq(self):
1916     """Check prerequisites.
1917
1918     This checks that we are not already the master.
1919
1920     """
1921     self.new_master = utils.HostInfo().name
1922     self.old_master = self.sstore.GetMasterNode()
1923
1924     if self.old_master == self.new_master:
1925       raise errors.OpPrereqError("This commands must be run on the node"
1926                                  " where you want the new master to be."
1927                                  " %s is already the master" %
1928                                  self.old_master)
1929
1930   def Exec(self, feedback_fn):
1931     """Failover the master node.
1932
1933     This command, when run on a non-master node, will cause the current
1934     master to cease being master, and the non-master to become new
1935     master.
1936
1937     """
1938     #TODO: do not rely on gethostname returning the FQDN
1939     logger.Info("setting master to %s, old master: %s" %
1940                 (self.new_master, self.old_master))
1941
1942     if not rpc.call_node_stop_master(self.old_master):
1943       logger.Error("could disable the master role on the old master"
1944                    " %s, please disable manually" % self.old_master)
1945
1946     ss = self.sstore
1947     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1948     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1949                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1950       logger.Error("could not distribute the new simple store master file"
1951                    " to the other nodes, please check.")
1952
1953     if not rpc.call_node_start_master(self.new_master):
1954       logger.Error("could not start the master role on the new master"
1955                    " %s, please check" % self.new_master)
1956       feedback_fn("Error in activating the master IP on the new master,"
1957                   " please fix manually.")
1958
1959
1960
1961 class LUQueryClusterInfo(NoHooksLU):
1962   """Query cluster configuration.
1963
1964   """
1965   _OP_REQP = []
1966   REQ_MASTER = False
1967
1968   def CheckPrereq(self):
1969     """No prerequsites needed for this LU.
1970
1971     """
1972     pass
1973
1974   def Exec(self, feedback_fn):
1975     """Return cluster config.
1976
1977     """
1978     result = {
1979       "name": self.sstore.GetClusterName(),
1980       "software_version": constants.RELEASE_VERSION,
1981       "protocol_version": constants.PROTOCOL_VERSION,
1982       "config_version": constants.CONFIG_VERSION,
1983       "os_api_version": constants.OS_API_VERSION,
1984       "export_version": constants.EXPORT_VERSION,
1985       "master": self.sstore.GetMasterNode(),
1986       "architecture": (platform.architecture()[0], platform.machine()),
1987       "hypervisor_type": self.sstore.GetHypervisorType(),
1988       }
1989
1990     return result
1991
1992
1993 class LUClusterCopyFile(NoHooksLU):
1994   """Copy file to cluster.
1995
1996   """
1997   _OP_REQP = ["nodes", "filename"]
1998
1999   def CheckPrereq(self):
2000     """Check prerequisites.
2001
2002     It should check that the named file exists and that the given list
2003     of nodes is valid.
2004
2005     """
2006     if not os.path.exists(self.op.filename):
2007       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2008
2009     self.nodes = _GetWantedNodes(self, self.op.nodes)
2010
2011   def Exec(self, feedback_fn):
2012     """Copy a file from master to some nodes.
2013
2014     Args:
2015       opts - class with options as members
2016       args - list containing a single element, the file name
2017     Opts used:
2018       nodes - list containing the name of target nodes; if empty, all nodes
2019
2020     """
2021     filename = self.op.filename
2022
2023     myname = utils.HostInfo().name
2024
2025     for node in self.nodes:
2026       if node == myname:
2027         continue
2028       if not ssh.CopyFileToNode(node, filename):
2029         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2030
2031
2032 class LUDumpClusterConfig(NoHooksLU):
2033   """Return a text-representation of the cluster-config.
2034
2035   """
2036   _OP_REQP = []
2037
2038   def CheckPrereq(self):
2039     """No prerequisites.
2040
2041     """
2042     pass
2043
2044   def Exec(self, feedback_fn):
2045     """Dump a representation of the cluster config to the standard output.
2046
2047     """
2048     return self.cfg.DumpConfig()
2049
2050
2051 class LURunClusterCommand(NoHooksLU):
2052   """Run a command on some nodes.
2053
2054   """
2055   _OP_REQP = ["command", "nodes"]
2056
2057   def CheckPrereq(self):
2058     """Check prerequisites.
2059
2060     It checks that the given list of nodes is valid.
2061
2062     """
2063     self.nodes = _GetWantedNodes(self, self.op.nodes)
2064
2065   def Exec(self, feedback_fn):
2066     """Run a command on some nodes.
2067
2068     """
2069     # put the master at the end of the nodes list
2070     master_node = self.sstore.GetMasterNode()
2071     if master_node in self.nodes:
2072       self.nodes.remove(master_node)
2073       self.nodes.append(master_node)
2074
2075     data = []
2076     for node in self.nodes:
2077       result = ssh.SSHCall(node, "root", self.op.command)
2078       data.append((node, result.output, result.exit_code))
2079
2080     return data
2081
2082
2083 class LUActivateInstanceDisks(NoHooksLU):
2084   """Bring up an instance's disks.
2085
2086   """
2087   _OP_REQP = ["instance_name"]
2088
2089   def CheckPrereq(self):
2090     """Check prerequisites.
2091
2092     This checks that the instance is in the cluster.
2093
2094     """
2095     instance = self.cfg.GetInstanceInfo(
2096       self.cfg.ExpandInstanceName(self.op.instance_name))
2097     if instance is None:
2098       raise errors.OpPrereqError("Instance '%s' not known" %
2099                                  self.op.instance_name)
2100     self.instance = instance
2101
2102
2103   def Exec(self, feedback_fn):
2104     """Activate the disks.
2105
2106     """
2107     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2108     if not disks_ok:
2109       raise errors.OpExecError("Cannot activate block devices")
2110
2111     return disks_info
2112
2113
2114 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2115   """Prepare the block devices for an instance.
2116
2117   This sets up the block devices on all nodes.
2118
2119   Args:
2120     instance: a ganeti.objects.Instance object
2121     ignore_secondaries: if true, errors on secondary nodes won't result
2122                         in an error return from the function
2123
2124   Returns:
2125     false if the operation failed
2126     list of (host, instance_visible_name, node_visible_name) if the operation
2127          suceeded with the mapping from node devices to instance devices
2128   """
2129   device_info = []
2130   disks_ok = True
2131   iname = instance.name
2132   # With the two passes mechanism we try to reduce the window of
2133   # opportunity for the race condition of switching DRBD to primary
2134   # before handshaking occured, but we do not eliminate it
2135
2136   # The proper fix would be to wait (with some limits) until the
2137   # connection has been made and drbd transitions from WFConnection
2138   # into any other network-connected state (Connected, SyncTarget,
2139   # SyncSource, etc.)
2140
2141   # 1st pass, assemble on all nodes in secondary mode
2142   for inst_disk in instance.disks:
2143     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2144       cfg.SetDiskID(node_disk, node)
2145       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2146       if not result:
2147         logger.Error("could not prepare block device %s on node %s"
2148                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2149         if not ignore_secondaries:
2150           disks_ok = False
2151
2152   # FIXME: race condition on drbd migration to primary
2153
2154   # 2nd pass, do only the primary node
2155   for inst_disk in instance.disks:
2156     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2157       if node != instance.primary_node:
2158         continue
2159       cfg.SetDiskID(node_disk, node)
2160       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2161       if not result:
2162         logger.Error("could not prepare block device %s on node %s"
2163                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2164         disks_ok = False
2165     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2166
2167   # leave the disks configured for the primary node
2168   # this is a workaround that would be fixed better by
2169   # improving the logical/physical id handling
2170   for disk in instance.disks:
2171     cfg.SetDiskID(disk, instance.primary_node)
2172
2173   return disks_ok, device_info
2174
2175
2176 def _StartInstanceDisks(cfg, instance, force):
2177   """Start the disks of an instance.
2178
2179   """
2180   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2181                                            ignore_secondaries=force)
2182   if not disks_ok:
2183     _ShutdownInstanceDisks(instance, cfg)
2184     if force is not None and not force:
2185       logger.Error("If the message above refers to a secondary node,"
2186                    " you can retry the operation using '--force'.")
2187     raise errors.OpExecError("Disk consistency error")
2188
2189
2190 class LUDeactivateInstanceDisks(NoHooksLU):
2191   """Shutdown an instance's disks.
2192
2193   """
2194   _OP_REQP = ["instance_name"]
2195
2196   def CheckPrereq(self):
2197     """Check prerequisites.
2198
2199     This checks that the instance is in the cluster.
2200
2201     """
2202     instance = self.cfg.GetInstanceInfo(
2203       self.cfg.ExpandInstanceName(self.op.instance_name))
2204     if instance is None:
2205       raise errors.OpPrereqError("Instance '%s' not known" %
2206                                  self.op.instance_name)
2207     self.instance = instance
2208
2209   def Exec(self, feedback_fn):
2210     """Deactivate the disks
2211
2212     """
2213     instance = self.instance
2214     ins_l = rpc.call_instance_list([instance.primary_node])
2215     ins_l = ins_l[instance.primary_node]
2216     if not type(ins_l) is list:
2217       raise errors.OpExecError("Can't contact node '%s'" %
2218                                instance.primary_node)
2219
2220     if self.instance.name in ins_l:
2221       raise errors.OpExecError("Instance is running, can't shutdown"
2222                                " block devices.")
2223
2224     _ShutdownInstanceDisks(instance, self.cfg)
2225
2226
2227 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2228   """Shutdown block devices of an instance.
2229
2230   This does the shutdown on all nodes of the instance.
2231
2232   If the ignore_primary is false, errors on the primary node are
2233   ignored.
2234
2235   """
2236   result = True
2237   for disk in instance.disks:
2238     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2239       cfg.SetDiskID(top_disk, node)
2240       if not rpc.call_blockdev_shutdown(node, top_disk):
2241         logger.Error("could not shutdown block device %s on node %s" %
2242                      (disk.iv_name, node))
2243         if not ignore_primary or node != instance.primary_node:
2244           result = False
2245   return result
2246
2247
2248 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2249   """Checks if a node has enough free memory.
2250
2251   This function check if a given node has the needed amount of free
2252   memory. In case the node has less memory or we cannot get the
2253   information from the node, this function raise an OpPrereqError
2254   exception.
2255
2256   Args:
2257     - cfg: a ConfigWriter instance
2258     - node: the node name
2259     - reason: string to use in the error message
2260     - requested: the amount of memory in MiB
2261
2262   """
2263   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2264   if not nodeinfo or not isinstance(nodeinfo, dict):
2265     raise errors.OpPrereqError("Could not contact node %s for resource"
2266                              " information" % (node,))
2267
2268   free_mem = nodeinfo[node].get('memory_free')
2269   if not isinstance(free_mem, int):
2270     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271                              " was '%s'" % (node, free_mem))
2272   if requested > free_mem:
2273     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274                              " needed %s MiB, available %s MiB" %
2275                              (node, reason, requested, free_mem))
2276
2277
2278 class LUStartupInstance(LogicalUnit):
2279   """Starts an instance.
2280
2281   """
2282   HPATH = "instance-start"
2283   HTYPE = constants.HTYPE_INSTANCE
2284   _OP_REQP = ["instance_name", "force"]
2285
2286   def BuildHooksEnv(self):
2287     """Build hooks env.
2288
2289     This runs on master, primary and secondary nodes of the instance.
2290
2291     """
2292     env = {
2293       "FORCE": self.op.force,
2294       }
2295     env.update(_BuildInstanceHookEnvByObject(self.instance))
2296     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297           list(self.instance.secondary_nodes))
2298     return env, nl, nl
2299
2300   def CheckPrereq(self):
2301     """Check prerequisites.
2302
2303     This checks that the instance is in the cluster.
2304
2305     """
2306     instance = self.cfg.GetInstanceInfo(
2307       self.cfg.ExpandInstanceName(self.op.instance_name))
2308     if instance is None:
2309       raise errors.OpPrereqError("Instance '%s' not known" %
2310                                  self.op.instance_name)
2311
2312     # check bridges existance
2313     _CheckInstanceBridgesExist(instance)
2314
2315     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316                          "starting instance %s" % instance.name,
2317                          instance.memory)
2318
2319     self.instance = instance
2320     self.op.instance_name = instance.name
2321
2322   def Exec(self, feedback_fn):
2323     """Start the instance.
2324
2325     """
2326     instance = self.instance
2327     force = self.op.force
2328     extra_args = getattr(self.op, "extra_args", "")
2329
2330     self.cfg.MarkInstanceUp(instance.name)
2331
2332     node_current = instance.primary_node
2333
2334     _StartInstanceDisks(self.cfg, instance, force)
2335
2336     if not rpc.call_instance_start(node_current, instance, extra_args):
2337       _ShutdownInstanceDisks(instance, self.cfg)
2338       raise errors.OpExecError("Could not start instance")
2339
2340
2341 class LURebootInstance(LogicalUnit):
2342   """Reboot an instance.
2343
2344   """
2345   HPATH = "instance-reboot"
2346   HTYPE = constants.HTYPE_INSTANCE
2347   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2348
2349   def BuildHooksEnv(self):
2350     """Build hooks env.
2351
2352     This runs on master, primary and secondary nodes of the instance.
2353
2354     """
2355     env = {
2356       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2357       }
2358     env.update(_BuildInstanceHookEnvByObject(self.instance))
2359     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360           list(self.instance.secondary_nodes))
2361     return env, nl, nl
2362
2363   def CheckPrereq(self):
2364     """Check prerequisites.
2365
2366     This checks that the instance is in the cluster.
2367
2368     """
2369     instance = self.cfg.GetInstanceInfo(
2370       self.cfg.ExpandInstanceName(self.op.instance_name))
2371     if instance is None:
2372       raise errors.OpPrereqError("Instance '%s' not known" %
2373                                  self.op.instance_name)
2374
2375     # check bridges existance
2376     _CheckInstanceBridgesExist(instance)
2377
2378     self.instance = instance
2379     self.op.instance_name = instance.name
2380
2381   def Exec(self, feedback_fn):
2382     """Reboot the instance.
2383
2384     """
2385     instance = self.instance
2386     ignore_secondaries = self.op.ignore_secondaries
2387     reboot_type = self.op.reboot_type
2388     extra_args = getattr(self.op, "extra_args", "")
2389
2390     node_current = instance.primary_node
2391
2392     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393                            constants.INSTANCE_REBOOT_HARD,
2394                            constants.INSTANCE_REBOOT_FULL]:
2395       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396                                   (constants.INSTANCE_REBOOT_SOFT,
2397                                    constants.INSTANCE_REBOOT_HARD,
2398                                    constants.INSTANCE_REBOOT_FULL))
2399
2400     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401                        constants.INSTANCE_REBOOT_HARD]:
2402       if not rpc.call_instance_reboot(node_current, instance,
2403                                       reboot_type, extra_args):
2404         raise errors.OpExecError("Could not reboot instance")
2405     else:
2406       if not rpc.call_instance_shutdown(node_current, instance):
2407         raise errors.OpExecError("could not shutdown instance for full reboot")
2408       _ShutdownInstanceDisks(instance, self.cfg)
2409       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410       if not rpc.call_instance_start(node_current, instance, extra_args):
2411         _ShutdownInstanceDisks(instance, self.cfg)
2412         raise errors.OpExecError("Could not start instance for full reboot")
2413
2414     self.cfg.MarkInstanceUp(instance.name)
2415
2416
2417 class LUShutdownInstance(LogicalUnit):
2418   """Shutdown an instance.
2419
2420   """
2421   HPATH = "instance-stop"
2422   HTYPE = constants.HTYPE_INSTANCE
2423   _OP_REQP = ["instance_name"]
2424
2425   def BuildHooksEnv(self):
2426     """Build hooks env.
2427
2428     This runs on master, primary and secondary nodes of the instance.
2429
2430     """
2431     env = _BuildInstanceHookEnvByObject(self.instance)
2432     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433           list(self.instance.secondary_nodes))
2434     return env, nl, nl
2435
2436   def CheckPrereq(self):
2437     """Check prerequisites.
2438
2439     This checks that the instance is in the cluster.
2440
2441     """
2442     instance = self.cfg.GetInstanceInfo(
2443       self.cfg.ExpandInstanceName(self.op.instance_name))
2444     if instance is None:
2445       raise errors.OpPrereqError("Instance '%s' not known" %
2446                                  self.op.instance_name)
2447     self.instance = instance
2448
2449   def Exec(self, feedback_fn):
2450     """Shutdown the instance.
2451
2452     """
2453     instance = self.instance
2454     node_current = instance.primary_node
2455     self.cfg.MarkInstanceDown(instance.name)
2456     if not rpc.call_instance_shutdown(node_current, instance):
2457       logger.Error("could not shutdown instance")
2458
2459     _ShutdownInstanceDisks(instance, self.cfg)
2460
2461
2462 class LUReinstallInstance(LogicalUnit):
2463   """Reinstall an instance.
2464
2465   """
2466   HPATH = "instance-reinstall"
2467   HTYPE = constants.HTYPE_INSTANCE
2468   _OP_REQP = ["instance_name"]
2469
2470   def BuildHooksEnv(self):
2471     """Build hooks env.
2472
2473     This runs on master, primary and secondary nodes of the instance.
2474
2475     """
2476     env = _BuildInstanceHookEnvByObject(self.instance)
2477     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478           list(self.instance.secondary_nodes))
2479     return env, nl, nl
2480
2481   def CheckPrereq(self):
2482     """Check prerequisites.
2483
2484     This checks that the instance is in the cluster and is not running.
2485
2486     """
2487     instance = self.cfg.GetInstanceInfo(
2488       self.cfg.ExpandInstanceName(self.op.instance_name))
2489     if instance is None:
2490       raise errors.OpPrereqError("Instance '%s' not known" %
2491                                  self.op.instance_name)
2492     if instance.disk_template == constants.DT_DISKLESS:
2493       raise errors.OpPrereqError("Instance '%s' has no disks" %
2494                                  self.op.instance_name)
2495     if instance.status != "down":
2496       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497                                  self.op.instance_name)
2498     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2499     if remote_info:
2500       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501                                  (self.op.instance_name,
2502                                   instance.primary_node))
2503
2504     self.op.os_type = getattr(self.op, "os_type", None)
2505     if self.op.os_type is not None:
2506       # OS verification
2507       pnode = self.cfg.GetNodeInfo(
2508         self.cfg.ExpandNodeName(instance.primary_node))
2509       if pnode is None:
2510         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2511                                    self.op.pnode)
2512       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2513       if not os_obj:
2514         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515                                    " primary node"  % self.op.os_type)
2516
2517     self.instance = instance
2518
2519   def Exec(self, feedback_fn):
2520     """Reinstall the instance.
2521
2522     """
2523     inst = self.instance
2524
2525     if self.op.os_type is not None:
2526       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527       inst.os = self.op.os_type
2528       self.cfg.AddInstance(inst)
2529
2530     _StartInstanceDisks(self.cfg, inst, None)
2531     try:
2532       feedback_fn("Running the instance OS create scripts...")
2533       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534         raise errors.OpExecError("Could not install OS for instance %s"
2535                                  " on node %s" %
2536                                  (inst.name, inst.primary_node))
2537     finally:
2538       _ShutdownInstanceDisks(inst, self.cfg)
2539
2540
2541 class LURenameInstance(LogicalUnit):
2542   """Rename an instance.
2543
2544   """
2545   HPATH = "instance-rename"
2546   HTYPE = constants.HTYPE_INSTANCE
2547   _OP_REQP = ["instance_name", "new_name"]
2548
2549   def BuildHooksEnv(self):
2550     """Build hooks env.
2551
2552     This runs on master, primary and secondary nodes of the instance.
2553
2554     """
2555     env = _BuildInstanceHookEnvByObject(self.instance)
2556     env["INSTANCE_NEW_NAME"] = self.op.new_name
2557     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558           list(self.instance.secondary_nodes))
2559     return env, nl, nl
2560
2561   def CheckPrereq(self):
2562     """Check prerequisites.
2563
2564     This checks that the instance is in the cluster and is not running.
2565
2566     """
2567     instance = self.cfg.GetInstanceInfo(
2568       self.cfg.ExpandInstanceName(self.op.instance_name))
2569     if instance is None:
2570       raise errors.OpPrereqError("Instance '%s' not known" %
2571                                  self.op.instance_name)
2572     if instance.status != "down":
2573       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574                                  self.op.instance_name)
2575     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2576     if remote_info:
2577       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578                                  (self.op.instance_name,
2579                                   instance.primary_node))
2580     self.instance = instance
2581
2582     # new name verification
2583     name_info = utils.HostInfo(self.op.new_name)
2584
2585     self.op.new_name = new_name = name_info.name
2586     instance_list = self.cfg.GetInstanceList()
2587     if new_name in instance_list:
2588       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2589                                  new_name)
2590
2591     if not getattr(self.op, "ignore_ip", False):
2592       command = ["fping", "-q", name_info.ip]
2593       result = utils.RunCmd(command)
2594       if not result.failed:
2595         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2596                                    (name_info.ip, new_name))
2597
2598
2599   def Exec(self, feedback_fn):
2600     """Reinstall the instance.
2601
2602     """
2603     inst = self.instance
2604     old_name = inst.name
2605
2606     self.cfg.RenameInstance(inst.name, self.op.new_name)
2607
2608     # re-read the instance from the configuration after rename
2609     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2610
2611     _StartInstanceDisks(self.cfg, inst, None)
2612     try:
2613       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2614                                           "sda", "sdb"):
2615         msg = ("Could run OS rename script for instance %s on node %s (but the"
2616                " instance has been renamed in Ganeti)" %
2617                (inst.name, inst.primary_node))
2618         logger.Error(msg)
2619     finally:
2620       _ShutdownInstanceDisks(inst, self.cfg)
2621
2622
2623 class LURemoveInstance(LogicalUnit):
2624   """Remove an instance.
2625
2626   """
2627   HPATH = "instance-remove"
2628   HTYPE = constants.HTYPE_INSTANCE
2629   _OP_REQP = ["instance_name", "ignore_failures"]
2630
2631   def BuildHooksEnv(self):
2632     """Build hooks env.
2633
2634     This runs on master, primary and secondary nodes of the instance.
2635
2636     """
2637     env = _BuildInstanceHookEnvByObject(self.instance)
2638     nl = [self.sstore.GetMasterNode()]
2639     return env, nl, nl
2640
2641   def CheckPrereq(self):
2642     """Check prerequisites.
2643
2644     This checks that the instance is in the cluster.
2645
2646     """
2647     instance = self.cfg.GetInstanceInfo(
2648       self.cfg.ExpandInstanceName(self.op.instance_name))
2649     if instance is None:
2650       raise errors.OpPrereqError("Instance '%s' not known" %
2651                                  self.op.instance_name)
2652     self.instance = instance
2653
2654   def Exec(self, feedback_fn):
2655     """Remove the instance.
2656
2657     """
2658     instance = self.instance
2659     logger.Info("shutting down instance %s on node %s" %
2660                 (instance.name, instance.primary_node))
2661
2662     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2663       if self.op.ignore_failures:
2664         feedback_fn("Warning: can't shutdown instance")
2665       else:
2666         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2667                                  (instance.name, instance.primary_node))
2668
2669     logger.Info("removing block devices for instance %s" % instance.name)
2670
2671     if not _RemoveDisks(instance, self.cfg):
2672       if self.op.ignore_failures:
2673         feedback_fn("Warning: can't remove instance's disks")
2674       else:
2675         raise errors.OpExecError("Can't remove instance's disks")
2676
2677     logger.Info("removing instance %s out of cluster config" % instance.name)
2678
2679     self.cfg.RemoveInstance(instance.name)
2680
2681
2682 class LUQueryInstances(NoHooksLU):
2683   """Logical unit for querying instances.
2684
2685   """
2686   _OP_REQP = ["output_fields", "names"]
2687
2688   def CheckPrereq(self):
2689     """Check prerequisites.
2690
2691     This checks that the fields required are valid output fields.
2692
2693     """
2694     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2695     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2696                                "admin_state", "admin_ram",
2697                                "disk_template", "ip", "mac", "bridge",
2698                                "sda_size", "sdb_size", "vcpus", "tags"],
2699                        dynamic=self.dynamic_fields,
2700                        selected=self.op.output_fields)
2701
2702     self.wanted = _GetWantedInstances(self, self.op.names)
2703
2704   def Exec(self, feedback_fn):
2705     """Computes the list of nodes and their attributes.
2706
2707     """
2708     instance_names = self.wanted
2709     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2710                      in instance_names]
2711
2712     # begin data gathering
2713
2714     nodes = frozenset([inst.primary_node for inst in instance_list])
2715
2716     bad_nodes = []
2717     if self.dynamic_fields.intersection(self.op.output_fields):
2718       live_data = {}
2719       node_data = rpc.call_all_instances_info(nodes)
2720       for name in nodes:
2721         result = node_data[name]
2722         if result:
2723           live_data.update(result)
2724         elif result == False:
2725           bad_nodes.append(name)
2726         # else no instance is alive
2727     else:
2728       live_data = dict([(name, {}) for name in instance_names])
2729
2730     # end data gathering
2731
2732     output = []
2733     for instance in instance_list:
2734       iout = []
2735       for field in self.op.output_fields:
2736         if field == "name":
2737           val = instance.name
2738         elif field == "os":
2739           val = instance.os
2740         elif field == "pnode":
2741           val = instance.primary_node
2742         elif field == "snodes":
2743           val = list(instance.secondary_nodes)
2744         elif field == "admin_state":
2745           val = (instance.status != "down")
2746         elif field == "oper_state":
2747           if instance.primary_node in bad_nodes:
2748             val = None
2749           else:
2750             val = bool(live_data.get(instance.name))
2751         elif field == "status":
2752           if instance.primary_node in bad_nodes:
2753             val = "ERROR_nodedown"
2754           else:
2755             running = bool(live_data.get(instance.name))
2756             if running:
2757               if instance.status != "down":
2758                 val = "running"
2759               else:
2760                 val = "ERROR_up"
2761             else:
2762               if instance.status != "down":
2763                 val = "ERROR_down"
2764               else:
2765                 val = "ADMIN_down"
2766         elif field == "admin_ram":
2767           val = instance.memory
2768         elif field == "oper_ram":
2769           if instance.primary_node in bad_nodes:
2770             val = None
2771           elif instance.name in live_data:
2772             val = live_data[instance.name].get("memory", "?")
2773           else:
2774             val = "-"
2775         elif field == "disk_template":
2776           val = instance.disk_template
2777         elif field == "ip":
2778           val = instance.nics[0].ip
2779         elif field == "bridge":
2780           val = instance.nics[0].bridge
2781         elif field == "mac":
2782           val = instance.nics[0].mac
2783         elif field == "sda_size" or field == "sdb_size":
2784           disk = instance.FindDisk(field[:3])
2785           if disk is None:
2786             val = None
2787           else:
2788             val = disk.size
2789         elif field == "vcpus":
2790           val = instance.vcpus
2791         elif field == "tags":
2792           val = list(instance.GetTags())
2793         else:
2794           raise errors.ParameterError(field)
2795         iout.append(val)
2796       output.append(iout)
2797
2798     return output
2799
2800
2801 class LUFailoverInstance(LogicalUnit):
2802   """Failover an instance.
2803
2804   """
2805   HPATH = "instance-failover"
2806   HTYPE = constants.HTYPE_INSTANCE
2807   _OP_REQP = ["instance_name", "ignore_consistency"]
2808
2809   def BuildHooksEnv(self):
2810     """Build hooks env.
2811
2812     This runs on master, primary and secondary nodes of the instance.
2813
2814     """
2815     env = {
2816       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2817       }
2818     env.update(_BuildInstanceHookEnvByObject(self.instance))
2819     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2820     return env, nl, nl
2821
2822   def CheckPrereq(self):
2823     """Check prerequisites.
2824
2825     This checks that the instance is in the cluster.
2826
2827     """
2828     instance = self.cfg.GetInstanceInfo(
2829       self.cfg.ExpandInstanceName(self.op.instance_name))
2830     if instance is None:
2831       raise errors.OpPrereqError("Instance '%s' not known" %
2832                                  self.op.instance_name)
2833
2834     if instance.disk_template not in constants.DTS_NET_MIRROR:
2835       raise errors.OpPrereqError("Instance's disk layout is not"
2836                                  " network mirrored, cannot failover.")
2837
2838     secondary_nodes = instance.secondary_nodes
2839     if not secondary_nodes:
2840       raise errors.ProgrammerError("no secondary node but using "
2841                                    "DT_REMOTE_RAID1 template")
2842
2843     target_node = secondary_nodes[0]
2844     # check memory requirements on the secondary node
2845     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2846                          instance.name, instance.memory)
2847
2848     # check bridge existance
2849     brlist = [nic.bridge for nic in instance.nics]
2850     if not rpc.call_bridges_exist(target_node, brlist):
2851       raise errors.OpPrereqError("One or more target bridges %s does not"
2852                                  " exist on destination node '%s'" %
2853                                  (brlist, target_node))
2854
2855     self.instance = instance
2856
2857   def Exec(self, feedback_fn):
2858     """Failover an instance.
2859
2860     The failover is done by shutting it down on its present node and
2861     starting it on the secondary.
2862
2863     """
2864     instance = self.instance
2865
2866     source_node = instance.primary_node
2867     target_node = instance.secondary_nodes[0]
2868
2869     feedback_fn("* checking disk consistency between source and target")
2870     for dev in instance.disks:
2871       # for remote_raid1, these are md over drbd
2872       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2873         if instance.status == "up" and not self.op.ignore_consistency:
2874           raise errors.OpExecError("Disk %s is degraded on target node,"
2875                                    " aborting failover." % dev.iv_name)
2876
2877     feedback_fn("* shutting down instance on source node")
2878     logger.Info("Shutting down instance %s on node %s" %
2879                 (instance.name, source_node))
2880
2881     if not rpc.call_instance_shutdown(source_node, instance):
2882       if self.op.ignore_consistency:
2883         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2884                      " anyway. Please make sure node %s is down"  %
2885                      (instance.name, source_node, source_node))
2886       else:
2887         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2888                                  (instance.name, source_node))
2889
2890     feedback_fn("* deactivating the instance's disks on source node")
2891     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2892       raise errors.OpExecError("Can't shut down the instance's disks.")
2893
2894     instance.primary_node = target_node
2895     # distribute new instance config to the other nodes
2896     self.cfg.Update(instance)
2897
2898     # Only start the instance if it's marked as up
2899     if instance.status == "up":
2900       feedback_fn("* activating the instance's disks on target node")
2901       logger.Info("Starting instance %s on node %s" %
2902                   (instance.name, target_node))
2903
2904       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2905                                                ignore_secondaries=True)
2906       if not disks_ok:
2907         _ShutdownInstanceDisks(instance, self.cfg)
2908         raise errors.OpExecError("Can't activate the instance's disks")
2909
2910       feedback_fn("* starting the instance on the target node")
2911       if not rpc.call_instance_start(target_node, instance, None):
2912         _ShutdownInstanceDisks(instance, self.cfg)
2913         raise errors.OpExecError("Could not start instance %s on node %s." %
2914                                  (instance.name, target_node))
2915
2916
2917 class LUMigrateInstance(LogicalUnit):
2918   """Migrate an instance.
2919
2920   This is migration without shutting down, compared to the failover,
2921   which is done with shutdown.
2922
2923   """
2924   HPATH = "instance-migrate"
2925   HTYPE = constants.HTYPE_INSTANCE
2926   _OP_REQP = ["instance_name", "live"]
2927
2928   def BuildHooksEnv(self):
2929     """Build hooks env.
2930
2931     This runs on master, primary and secondary nodes of the instance.
2932
2933     """
2934     env = _BuildInstanceHookEnvByObject(self.instance)
2935     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2936     return env, nl, nl
2937
2938   def CheckPrereq(self):
2939     """Check prerequisites.
2940
2941     This checks that the instance is in the cluster.
2942
2943     """
2944     instance = self.cfg.GetInstanceInfo(
2945       self.cfg.ExpandInstanceName(self.op.instance_name))
2946     if instance is None:
2947       raise errors.OpPrereqError("Instance '%s' not known" %
2948                                  self.op.instance_name)
2949
2950     if instance.disk_template != constants.DT_DRBD8:
2951       raise errors.OpPrereqError("Instance's disk layout is not"
2952                                  " drbd8, cannot migrate.")
2953
2954     secondary_nodes = instance.secondary_nodes
2955     if not secondary_nodes:
2956       raise errors.ProgrammerError("no secondary node but using "
2957                                    "drbd8 disk template")
2958
2959     target_node = secondary_nodes[0]
2960     # check memory requirements on the secondary node
2961     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2962                          instance.name, instance.memory)
2963
2964     # check bridge existance
2965     brlist = [nic.bridge for nic in instance.nics]
2966     if not rpc.call_bridges_exist(target_node, brlist):
2967       raise errors.OpPrereqError("One or more target bridges %s does not"
2968                                  " exist on destination node '%s'" %
2969                                  (brlist, target_node))
2970
2971     ins_l = rpc.call_instance_list([instance.primary_node])
2972     ins_l = ins_l[instance.primary_node]
2973     if not type(ins_l) is list:
2974       raise errors.OpPrereqError("Can't contact node '%s'" %
2975                                instance.primary_node)
2976
2977     if instance.name not in ins_l:
2978       raise errors.OpPrereqError("Instance is not running, can't migrate"
2979                                  " - please use failover instead")
2980     self.instance = instance
2981
2982   def Exec(self, feedback_fn):
2983     """Migrate an instance.
2984
2985     The migrate is done by:
2986       - change the disks into dual-master mode
2987       - wait until disks are fully synchronized again
2988       - migrate the instance
2989       - change disks on the new secondary node (the old primary) to secondary
2990       - wait until disks are fully synchronized
2991       - change disks into single-master mode
2992
2993     """
2994     instance = self.instance
2995
2996     source_node = instance.primary_node
2997     target_node = instance.secondary_nodes[0]
2998     all_nodes = [source_node, target_node]
2999
3000     feedback_fn("* checking disk consistency between source and target")
3001     for dev in instance.disks:
3002       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3003         raise errors.OpExecError("Disk %s is degraded or not fully"
3004                                  " synchronized on target node,"
3005                                  " aborting migrate." % dev.iv_name)
3006
3007     feedback_fn("* ensuring the target is in secondary mode")
3008     result = rpc.call_blockdev_close(target_node, instance.name,
3009                                      instance.disks)
3010     if not result or not result[0]:
3011       raise errors.BlockDeviceError("Cannot switch target node to"
3012                                     " secondary: %s" % result[1])
3013
3014     feedback_fn("* changing disks into dual-master mode")
3015     logger.Info("Changing disks for instance %s into dual-master mode" %
3016                 (instance.name,))
3017
3018     nodes_ip = {
3019       source_node: self.cfg.GetNodeInfo(source_node).secondary_ip,
3020       target_node: self.cfg.GetNodeInfo(target_node).secondary_ip,
3021       }
3022     result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3023                                         instance.disks, nodes_ip, True)
3024     for node in all_nodes:
3025       if not result[node] or not result[node][0]:
3026         raise errors.OpExecError("Cannot change disks config on node %s,"
3027                                  " error %s" % (node, result[node][1]))
3028
3029     _WaitForSync(self.cfg, instance, self.proc)
3030
3031     feedback_fn("* migrating instance to %s" % target_node)
3032     result = rpc.call_instance_migrate(source_node, instance,
3033                                        nodes_ip[target_node], self.op.live)
3034     if not result or not result[0]:
3035       logger.Error("Instance migration failed, trying to revert disk status")
3036       res2 = rpc.call_blockdev_close(target_node, instance.name,
3037                                      instance.disks)
3038       if not res2 or not res2[0]:
3039         feedback_fn("Disk close on secondary failed, instance disks left"
3040                     " in dual-master mode")
3041       else:
3042         res2 = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3043                                           instance.disks, nodes_ip, False)
3044         if not (res2 and res2[source_node][0] and res2[target_node][0]):
3045           logger.Error("Warning: DRBD change to single-master failed: shutdown"
3046                        " manually the instance disks or via deactivate-disks,"
3047                        " and then restart them via activate-disks")
3048       raise errors.OpExecError("Could not migrate instance %s: %s" %
3049                                (instance.name, result[1]))
3050
3051     instance.primary_node = target_node
3052     # distribute new instance config to the other nodes
3053     self.cfg.Update(instance)
3054
3055     feedback_fn("* changing the instance's disks on source node to secondary")
3056     result = rpc.call_blockdev_close(source_node, instance.name,
3057                                      instance.disks)
3058     if not result or not result[0]:
3059       logger.Error("Warning: DRBD deactivation failed: shutdown manually the"
3060                    " instance disks or via deactivate-disks, and then"
3061                    " restart them via activate-disks")
3062
3063     _WaitForSync(self.cfg, instance, self.proc)
3064
3065     feedback_fn("* changing the instance's disks to single-master")
3066     result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3067                                         instance.disks, nodes_ip, False)
3068     if not (result and result[source_node][0] and result[target_node][0]):
3069       logger.Error("Warning: DRBD change to single-master failed: shutdown"
3070                    " manually the instance disks or via deactivate-disks,"
3071                    " and then restart them via activate-disks")
3072
3073
3074 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3075   """Create a tree of block devices on the primary node.
3076
3077   This always creates all devices.
3078
3079   """
3080   if device.children:
3081     for child in device.children:
3082       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3083         return False
3084
3085   cfg.SetDiskID(device, node)
3086   new_id = rpc.call_blockdev_create(node, device, device.size,
3087                                     instance.name, True, info)
3088   if not new_id:
3089     return False
3090   if device.physical_id is None:
3091     device.physical_id = new_id
3092   return True
3093
3094
3095 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3096   """Create a tree of block devices on a secondary node.
3097
3098   If this device type has to be created on secondaries, create it and
3099   all its children.
3100
3101   If not, just recurse to children keeping the same 'force' value.
3102
3103   """
3104   if device.CreateOnSecondary():
3105     force = True
3106   if device.children:
3107     for child in device.children:
3108       if not _CreateBlockDevOnSecondary(cfg, node, instance,
3109                                         child, force, info):
3110         return False
3111
3112   if not force:
3113     return True
3114   cfg.SetDiskID(device, node)
3115   new_id = rpc.call_blockdev_create(node, device, device.size,
3116                                     instance.name, False, info)
3117   if not new_id:
3118     return False
3119   if device.physical_id is None:
3120     device.physical_id = new_id
3121   return True
3122
3123
3124 def _GenerateUniqueNames(cfg, exts):
3125   """Generate a suitable LV name.
3126
3127   This will generate a logical volume name for the given instance.
3128
3129   """
3130   results = []
3131   for val in exts:
3132     new_id = cfg.GenerateUniqueID()
3133     results.append("%s%s" % (new_id, val))
3134   return results
3135
3136
3137 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3138   """Generate a drbd device complete with its children.
3139
3140   """
3141   port = cfg.AllocatePort()
3142   vgname = cfg.GetVGName()
3143   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3144                           logical_id=(vgname, names[0]))
3145   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3146                           logical_id=(vgname, names[1]))
3147   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3148                           logical_id = (primary, secondary, port),
3149                           children = [dev_data, dev_meta])
3150   return drbd_dev
3151
3152
3153 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3154   """Generate a drbd8 device complete with its children.
3155
3156   """
3157   port = cfg.AllocatePort()
3158   vgname = cfg.GetVGName()
3159   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3160                           logical_id=(vgname, names[0]))
3161   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3162                           logical_id=(vgname, names[1]))
3163   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3164                           logical_id = (primary, secondary, port),
3165                           children = [dev_data, dev_meta],
3166                           iv_name=iv_name)
3167   return drbd_dev
3168
3169 def _GenerateDiskTemplate(cfg, template_name,
3170                           instance_name, primary_node,
3171                           secondary_nodes, disk_sz, swap_sz):
3172   """Generate the entire disk layout for a given template type.
3173
3174   """
3175   #TODO: compute space requirements
3176
3177   vgname = cfg.GetVGName()
3178   if template_name == constants.DT_DISKLESS:
3179     disks = []
3180   elif template_name == constants.DT_PLAIN:
3181     if len(secondary_nodes) != 0:
3182       raise errors.ProgrammerError("Wrong template configuration")
3183
3184     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3185     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3186                            logical_id=(vgname, names[0]),
3187                            iv_name = "sda")
3188     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3189                            logical_id=(vgname, names[1]),
3190                            iv_name = "sdb")
3191     disks = [sda_dev, sdb_dev]
3192   elif template_name == constants.DT_LOCAL_RAID1:
3193     if len(secondary_nodes) != 0:
3194       raise errors.ProgrammerError("Wrong template configuration")
3195
3196
3197     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3198                                        ".sdb_m1", ".sdb_m2"])
3199     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3200                               logical_id=(vgname, names[0]))
3201     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3202                               logical_id=(vgname, names[1]))
3203     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3204                               size=disk_sz,
3205                               children = [sda_dev_m1, sda_dev_m2])
3206     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3207                               logical_id=(vgname, names[2]))
3208     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3209                               logical_id=(vgname, names[3]))
3210     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3211                               size=swap_sz,
3212                               children = [sdb_dev_m1, sdb_dev_m2])
3213     disks = [md_sda_dev, md_sdb_dev]
3214   elif template_name == constants.DT_REMOTE_RAID1:
3215     if len(secondary_nodes) != 1:
3216       raise errors.ProgrammerError("Wrong template configuration")
3217     remote_node = secondary_nodes[0]
3218     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3219                                        ".sdb_data", ".sdb_meta"])
3220     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3221                                          disk_sz, names[0:2])
3222     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3223                               children = [drbd_sda_dev], size=disk_sz)
3224     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3225                                          swap_sz, names[2:4])
3226     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3227                               children = [drbd_sdb_dev], size=swap_sz)
3228     disks = [md_sda_dev, md_sdb_dev]
3229   elif template_name == constants.DT_DRBD8:
3230     if len(secondary_nodes) != 1:
3231       raise errors.ProgrammerError("Wrong template configuration")
3232     remote_node = secondary_nodes[0]
3233     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3234                                        ".sdb_data", ".sdb_meta"])
3235     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3236                                          disk_sz, names[0:2], "sda")
3237     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3238                                          swap_sz, names[2:4], "sdb")
3239     disks = [drbd_sda_dev, drbd_sdb_dev]
3240   else:
3241     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3242   return disks
3243
3244
3245 def _GetInstanceInfoText(instance):
3246   """Compute that text that should be added to the disk's metadata.
3247
3248   """
3249   return "originstname+%s" % instance.name
3250
3251
3252 def _CreateDisks(cfg, instance):
3253   """Create all disks for an instance.
3254
3255   This abstracts away some work from AddInstance.
3256
3257   Args:
3258     instance: the instance object
3259
3260   Returns:
3261     True or False showing the success of the creation process
3262
3263   """
3264   info = _GetInstanceInfoText(instance)
3265
3266   for device in instance.disks:
3267     logger.Info("creating volume %s for instance %s" %
3268               (device.iv_name, instance.name))
3269     #HARDCODE
3270     for secondary_node in instance.secondary_nodes:
3271       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3272                                         device, False, info):
3273         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3274                      (device.iv_name, device, secondary_node))
3275         return False
3276     #HARDCODE
3277     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3278                                     instance, device, info):
3279       logger.Error("failed to create volume %s on primary!" %
3280                    device.iv_name)
3281       return False
3282   return True
3283
3284
3285 def _RemoveDisks(instance, cfg):
3286   """Remove all disks for an instance.
3287
3288   This abstracts away some work from `AddInstance()` and
3289   `RemoveInstance()`. Note that in case some of the devices couldn't
3290   be removed, the removal will continue with the other ones (compare
3291   with `_CreateDisks()`).
3292
3293   Args:
3294     instance: the instance object
3295
3296   Returns:
3297     True or False showing the success of the removal proces
3298
3299   """
3300   logger.Info("removing block devices for instance %s" % instance.name)
3301
3302   result = True
3303   for device in instance.disks:
3304     for node, disk in device.ComputeNodeTree(instance.primary_node):
3305       cfg.SetDiskID(disk, node)
3306       if not rpc.call_blockdev_remove(node, disk):
3307         logger.Error("could not remove block device %s on node %s,"
3308                      " continuing anyway" %
3309                      (device.iv_name, node))
3310         result = False
3311   return result
3312
3313
3314 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3315   """Compute disk size requirements in the volume group
3316
3317   This is currently hard-coded for the two-drive layout.
3318
3319   """
3320   # Required free disk space as a function of disk and swap space
3321   req_size_dict = {
3322     constants.DT_DISKLESS: None,
3323     constants.DT_PLAIN: disk_size + swap_size,
3324     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3325     # 256 MB are added for drbd metadata, 128MB for each drbd device
3326     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3327     constants.DT_DRBD8: disk_size + swap_size + 256,
3328   }
3329
3330   if disk_template not in req_size_dict:
3331     raise errors.ProgrammerError("Disk template '%s' size requirement"
3332                                  " is unknown" %  disk_template)
3333
3334   return req_size_dict[disk_template]
3335
3336
3337 class LUCreateInstance(LogicalUnit):
3338   """Create an instance.
3339
3340   """
3341   HPATH = "instance-add"
3342   HTYPE = constants.HTYPE_INSTANCE
3343   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3344               "disk_template", "swap_size", "mode", "start", "vcpus",
3345               "wait_for_sync", "ip_check", "mac"]
3346
3347   def _RunAllocator(self):
3348     """Run the allocator based on input opcode.
3349
3350     """
3351     disks = [{"size": self.op.disk_size, "mode": "w"},
3352              {"size": self.op.swap_size, "mode": "w"}]
3353     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3354              "bridge": self.op.bridge}]
3355     ial = IAllocator(self.cfg, self.sstore,
3356                      mode=constants.IALLOCATOR_MODE_ALLOC,
3357                      name=self.op.instance_name,
3358                      disk_template=self.op.disk_template,
3359                      tags=[],
3360                      os=self.op.os_type,
3361                      vcpus=self.op.vcpus,
3362                      mem_size=self.op.mem_size,
3363                      disks=disks,
3364                      nics=nics,
3365                      )
3366
3367     ial.Run(self.op.iallocator)
3368
3369     if not ial.success:
3370       raise errors.OpPrereqError("Can't compute nodes using"
3371                                  " iallocator '%s': %s" % (self.op.iallocator,
3372                                                            ial.info))
3373     if len(ial.nodes) != ial.required_nodes:
3374       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3375                                  " of nodes (%s), required %s" %
3376                                  (len(ial.nodes), ial.required_nodes))
3377     self.op.pnode = ial.nodes[0]
3378     logger.ToStdout("Selected nodes for the instance: %s" %
3379                     (", ".join(ial.nodes),))
3380     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3381                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3382     if ial.required_nodes == 2:
3383       self.op.snode = ial.nodes[1]
3384
3385   def BuildHooksEnv(self):
3386     """Build hooks env.
3387
3388     This runs on master, primary and secondary nodes of the instance.
3389
3390     """
3391     env = {
3392       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3393       "INSTANCE_DISK_SIZE": self.op.disk_size,
3394       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3395       "INSTANCE_ADD_MODE": self.op.mode,
3396       }
3397     if self.op.mode == constants.INSTANCE_IMPORT:
3398       env["INSTANCE_SRC_NODE"] = self.op.src_node
3399       env["INSTANCE_SRC_PATH"] = self.op.src_path
3400       env["INSTANCE_SRC_IMAGE"] = self.src_image
3401
3402     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3403       primary_node=self.op.pnode,
3404       secondary_nodes=self.secondaries,
3405       status=self.instance_status,
3406       os_type=self.op.os_type,
3407       memory=self.op.mem_size,
3408       vcpus=self.op.vcpus,
3409       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3410     ))
3411
3412     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3413           self.secondaries)
3414     return env, nl, nl
3415
3416
3417   def CheckPrereq(self):
3418     """Check prerequisites.
3419
3420     """
3421     # set optional parameters to none if they don't exist
3422     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3423                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3424                  "vnc_bind_address"]:
3425       if not hasattr(self.op, attr):
3426         setattr(self.op, attr, None)
3427
3428     if self.op.mode not in (constants.INSTANCE_CREATE,
3429                             constants.INSTANCE_IMPORT):
3430       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3431                                  self.op.mode)
3432
3433     if self.op.mode == constants.INSTANCE_IMPORT:
3434       src_node = getattr(self.op, "src_node", None)
3435       src_path = getattr(self.op, "src_path", None)
3436       if src_node is None or src_path is None:
3437         raise errors.OpPrereqError("Importing an instance requires source"
3438                                    " node and path options")
3439       src_node_full = self.cfg.ExpandNodeName(src_node)
3440       if src_node_full is None:
3441         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3442       self.op.src_node = src_node = src_node_full
3443
3444       if not os.path.isabs(src_path):
3445         raise errors.OpPrereqError("The source path must be absolute")
3446
3447       export_info = rpc.call_export_info(src_node, src_path)
3448
3449       if not export_info:
3450         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3451
3452       if not export_info.has_section(constants.INISECT_EXP):
3453         raise errors.ProgrammerError("Corrupted export config")
3454
3455       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3456       if (int(ei_version) != constants.EXPORT_VERSION):
3457         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3458                                    (ei_version, constants.EXPORT_VERSION))
3459
3460       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3461         raise errors.OpPrereqError("Can't import instance with more than"
3462                                    " one data disk")
3463
3464       # FIXME: are the old os-es, disk sizes, etc. useful?
3465       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3466       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3467                                                          'disk0_dump'))
3468       self.src_image = diskimage
3469     else: # INSTANCE_CREATE
3470       if getattr(self.op, "os_type", None) is None:
3471         raise errors.OpPrereqError("No guest OS specified")
3472
3473     #### instance parameters check
3474
3475     # disk template and mirror node verification
3476     if self.op.disk_template not in constants.DISK_TEMPLATES:
3477       raise errors.OpPrereqError("Invalid disk template name")
3478
3479     # instance name verification
3480     hostname1 = utils.HostInfo(self.op.instance_name)
3481
3482     self.op.instance_name = instance_name = hostname1.name
3483     instance_list = self.cfg.GetInstanceList()
3484     if instance_name in instance_list:
3485       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3486                                  instance_name)
3487
3488     # ip validity checks
3489     ip = getattr(self.op, "ip", None)
3490     if ip is None or ip.lower() == "none":
3491       inst_ip = None
3492     elif ip.lower() == "auto":
3493       inst_ip = hostname1.ip
3494     else:
3495       if not utils.IsValidIP(ip):
3496         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3497                                    " like a valid IP" % ip)
3498       inst_ip = ip
3499     self.inst_ip = self.op.ip = inst_ip
3500
3501     if self.op.start and not self.op.ip_check:
3502       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3503                                  " adding an instance in start mode")
3504
3505     if self.op.ip_check:
3506       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3507         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3508                                    (hostname1.ip, instance_name))
3509
3510     # MAC address verification
3511     if self.op.mac != "auto":
3512       if not utils.IsValidMac(self.op.mac.lower()):
3513         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3514                                    self.op.mac)
3515
3516     # bridge verification
3517     bridge = getattr(self.op, "bridge", None)
3518     if bridge is None:
3519       self.op.bridge = self.cfg.GetDefBridge()
3520     else:
3521       self.op.bridge = bridge
3522
3523     # boot order verification
3524     if self.op.hvm_boot_order is not None:
3525       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3526         raise errors.OpPrereqError("invalid boot order specified,"
3527                                    " must be one or more of [acdn]")
3528     #### allocator run
3529
3530     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3531       raise errors.OpPrereqError("One and only one of iallocator and primary"
3532                                  " node must be given")
3533
3534     if self.op.iallocator is not None:
3535       self._RunAllocator()
3536
3537     #### node related checks
3538
3539     # check primary node
3540     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3541     if pnode is None:
3542       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3543                                  self.op.pnode)
3544     self.op.pnode = pnode.name
3545     self.pnode = pnode
3546     self.secondaries = []
3547
3548     # mirror node verification
3549     if self.op.disk_template in constants.DTS_NET_MIRROR:
3550       if getattr(self.op, "snode", None) is None:
3551         raise errors.OpPrereqError("The networked disk templates need"
3552                                    " a mirror node")
3553
3554       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3555       if snode_name is None:
3556         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3557                                    self.op.snode)
3558       elif snode_name == pnode.name:
3559         raise errors.OpPrereqError("The secondary node cannot be"
3560                                    " the primary node.")
3561       self.secondaries.append(snode_name)
3562
3563     req_size = _ComputeDiskSize(self.op.disk_template,
3564                                 self.op.disk_size, self.op.swap_size)
3565
3566     # Check lv size requirements
3567     if req_size is not None:
3568       nodenames = [pnode.name] + self.secondaries
3569       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3570       for node in nodenames:
3571         info = nodeinfo.get(node, None)
3572         if not info:
3573           raise errors.OpPrereqError("Cannot get current information"
3574                                      " from node '%s'" % node)
3575         vg_free = info.get('vg_free', None)
3576         if not isinstance(vg_free, int):
3577           raise errors.OpPrereqError("Can't compute free disk space on"
3578                                      " node %s" % node)
3579         if req_size > info['vg_free']:
3580           raise errors.OpPrereqError("Not enough disk space on target node %s."
3581                                      " %d MB available, %d MB required" %
3582                                      (node, info['vg_free'], req_size))
3583
3584     # os verification
3585     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3586     if not os_obj:
3587       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3588                                  " primary node"  % self.op.os_type)
3589
3590     if self.op.kernel_path == constants.VALUE_NONE:
3591       raise errors.OpPrereqError("Can't set instance kernel to none")
3592
3593
3594     # bridge check on primary node
3595     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3596       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3597                                  " destination node '%s'" %
3598                                  (self.op.bridge, pnode.name))
3599
3600     # memory check on primary node
3601     if self.op.start:
3602       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3603                            "creating instance %s" % self.op.instance_name,
3604                            self.op.mem_size)
3605
3606     # hvm_cdrom_image_path verification
3607     if self.op.hvm_cdrom_image_path is not None:
3608       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3609         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3610                                    " be an absolute path or None, not %s" %
3611                                    self.op.hvm_cdrom_image_path)
3612       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3613         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3614                                    " regular file or a symlink pointing to"
3615                                    " an existing regular file, not %s" %
3616                                    self.op.hvm_cdrom_image_path)
3617
3618     # vnc_bind_address verification
3619     if self.op.vnc_bind_address is not None:
3620       if not utils.IsValidIP(self.op.vnc_bind_address):
3621         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3622                                    " like a valid IP address" %
3623                                    self.op.vnc_bind_address)
3624
3625     if self.op.start:
3626       self.instance_status = 'up'
3627     else:
3628       self.instance_status = 'down'
3629
3630   def Exec(self, feedback_fn):
3631     """Create and add the instance to the cluster.
3632
3633     """
3634     instance = self.op.instance_name
3635     pnode_name = self.pnode.name
3636
3637     if self.op.mac == "auto":
3638       mac_address = self.cfg.GenerateMAC()
3639     else:
3640       mac_address = self.op.mac
3641
3642     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3643     if self.inst_ip is not None:
3644       nic.ip = self.inst_ip
3645
3646     ht_kind = self.sstore.GetHypervisorType()
3647     if ht_kind in constants.HTS_REQ_PORT:
3648       network_port = self.cfg.AllocatePort()
3649     else:
3650       network_port = None
3651
3652     if self.op.vnc_bind_address is None:
3653       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3654
3655     disks = _GenerateDiskTemplate(self.cfg,
3656                                   self.op.disk_template,
3657                                   instance, pnode_name,
3658                                   self.secondaries, self.op.disk_size,
3659                                   self.op.swap_size)
3660
3661     iobj = objects.Instance(name=instance, os=self.op.os_type,
3662                             primary_node=pnode_name,
3663                             memory=self.op.mem_size,
3664                             vcpus=self.op.vcpus,
3665                             nics=[nic], disks=disks,
3666                             disk_template=self.op.disk_template,
3667                             status=self.instance_status,
3668                             network_port=network_port,
3669                             kernel_path=self.op.kernel_path,
3670                             initrd_path=self.op.initrd_path,
3671                             hvm_boot_order=self.op.hvm_boot_order,
3672                             hvm_acpi=self.op.hvm_acpi,
3673                             hvm_pae=self.op.hvm_pae,
3674                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3675                             vnc_bind_address=self.op.vnc_bind_address,
3676                             )
3677
3678     feedback_fn("* creating instance disks...")
3679     if not _CreateDisks(self.cfg, iobj):
3680       _RemoveDisks(iobj, self.cfg)
3681       raise errors.OpExecError("Device creation failed, reverting...")
3682
3683     feedback_fn("adding instance %s to cluster config" % instance)
3684
3685     self.cfg.AddInstance(iobj)
3686
3687     if self.op.wait_for_sync:
3688       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3689     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3690       # make sure the disks are not degraded (still sync-ing is ok)
3691       time.sleep(15)
3692       feedback_fn("* checking mirrors status")
3693       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3694     else:
3695       disk_abort = False
3696
3697     if disk_abort:
3698       _RemoveDisks(iobj, self.cfg)
3699       self.cfg.RemoveInstance(iobj.name)
3700       raise errors.OpExecError("There are some degraded disks for"
3701                                " this instance")
3702
3703     feedback_fn("creating os for instance %s on node %s" %
3704                 (instance, pnode_name))
3705
3706     if iobj.disk_template != constants.DT_DISKLESS:
3707       if self.op.mode == constants.INSTANCE_CREATE:
3708         feedback_fn("* running the instance OS create scripts...")
3709         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3710           raise errors.OpExecError("could not add os for instance %s"
3711                                    " on node %s" %
3712                                    (instance, pnode_name))
3713
3714       elif self.op.mode == constants.INSTANCE_IMPORT:
3715         feedback_fn("* running the instance OS import scripts...")
3716         src_node = self.op.src_node
3717         src_image = self.src_image
3718         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3719                                                 src_node, src_image):
3720           raise errors.OpExecError("Could not import os for instance"
3721                                    " %s on node %s" %
3722                                    (instance, pnode_name))
3723       else:
3724         # also checked in the prereq part
3725         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3726                                      % self.op.mode)
3727
3728     if self.op.start:
3729       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3730       feedback_fn("* starting instance...")
3731       if not rpc.call_instance_start(pnode_name, iobj, None):
3732         raise errors.OpExecError("Could not start instance")
3733
3734
3735 class LUConnectConsole(NoHooksLU):
3736   """Connect to an instance's console.
3737
3738   This is somewhat special in that it returns the command line that
3739   you need to run on the master node in order to connect to the
3740   console.
3741
3742   """
3743   _OP_REQP = ["instance_name"]
3744
3745   def CheckPrereq(self):
3746     """Check prerequisites.
3747
3748     This checks that the instance is in the cluster.
3749
3750     """
3751     instance = self.cfg.GetInstanceInfo(
3752       self.cfg.ExpandInstanceName(self.op.instance_name))
3753     if instance is None:
3754       raise errors.OpPrereqError("Instance '%s' not known" %
3755                                  self.op.instance_name)
3756     self.instance = instance
3757
3758   def Exec(self, feedback_fn):
3759     """Connect to the console of an instance
3760
3761     """
3762     instance = self.instance
3763     node = instance.primary_node
3764
3765     node_insts = rpc.call_instance_list([node])[node]
3766     if node_insts is False:
3767       raise errors.OpExecError("Can't connect to node %s." % node)
3768
3769     if instance.name not in node_insts:
3770       raise errors.OpExecError("Instance %s is not running." % instance.name)
3771
3772     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3773
3774     hyper = hypervisor.GetHypervisor()
3775     console_cmd = hyper.GetShellCommandForConsole(instance)
3776     # build ssh cmdline
3777     argv = ["ssh", "-q", "-t"]
3778     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3779     argv.extend(ssh.BATCH_MODE_OPTS)
3780     argv.append(node)
3781     argv.append(console_cmd)
3782     return "ssh", argv
3783
3784
3785 class LUAddMDDRBDComponent(LogicalUnit):
3786   """Adda new mirror member to an instance's disk.
3787
3788   """
3789   HPATH = "mirror-add"
3790   HTYPE = constants.HTYPE_INSTANCE
3791   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3792
3793   def BuildHooksEnv(self):
3794     """Build hooks env.
3795
3796     This runs on the master, the primary and all the secondaries.
3797
3798     """
3799     env = {
3800       "NEW_SECONDARY": self.op.remote_node,
3801       "DISK_NAME": self.op.disk_name,
3802       }
3803     env.update(_BuildInstanceHookEnvByObject(self.instance))
3804     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3805           self.op.remote_node,] + list(self.instance.secondary_nodes)
3806     return env, nl, nl
3807
3808   def CheckPrereq(self):
3809     """Check prerequisites.
3810
3811     This checks that the instance is in the cluster.
3812
3813     """
3814     instance = self.cfg.GetInstanceInfo(
3815       self.cfg.ExpandInstanceName(self.op.instance_name))
3816     if instance is None:
3817       raise errors.OpPrereqError("Instance '%s' not known" %
3818                                  self.op.instance_name)
3819     self.instance = instance
3820
3821     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3822     if remote_node is None:
3823       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3824     self.remote_node = remote_node
3825
3826     if remote_node == instance.primary_node:
3827       raise errors.OpPrereqError("The specified node is the primary node of"
3828                                  " the instance.")
3829
3830     if instance.disk_template != constants.DT_REMOTE_RAID1:
3831       raise errors.OpPrereqError("Instance's disk layout is not"
3832                                  " remote_raid1.")
3833     for disk in instance.disks:
3834       if disk.iv_name == self.op.disk_name:
3835         break
3836     else:
3837       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3838                                  " instance." % self.op.disk_name)
3839     if len(disk.children) > 1:
3840       raise errors.OpPrereqError("The device already has two slave devices."
3841                                  " This would create a 3-disk raid1 which we"
3842                                  " don't allow.")
3843     self.disk = disk
3844
3845   def Exec(self, feedback_fn):
3846     """Add the mirror component
3847
3848     """
3849     disk = self.disk
3850     instance = self.instance
3851
3852     remote_node = self.remote_node
3853     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3854     names = _GenerateUniqueNames(self.cfg, lv_names)
3855     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3856                                      remote_node, disk.size, names)
3857
3858     logger.Info("adding new mirror component on secondary")
3859     #HARDCODE
3860     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3861                                       new_drbd, False,
3862                                       _GetInstanceInfoText(instance)):
3863       raise errors.OpExecError("Failed to create new component on secondary"
3864                                " node %s" % remote_node)
3865
3866     logger.Info("adding new mirror component on primary")
3867     #HARDCODE
3868     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3869                                     instance, new_drbd,
3870                                     _GetInstanceInfoText(instance)):
3871       # remove secondary dev
3872       self.cfg.SetDiskID(new_drbd, remote_node)
3873       rpc.call_blockdev_remove(remote_node, new_drbd)
3874       raise errors.OpExecError("Failed to create volume on primary")
3875
3876     # the device exists now
3877     # call the primary node to add the mirror to md
3878     logger.Info("adding new mirror component to md")
3879     if not rpc.call_blockdev_addchildren(instance.primary_node,
3880                                          disk, [new_drbd]):
3881       logger.Error("Can't add mirror compoment to md!")
3882       self.cfg.SetDiskID(new_drbd, remote_node)
3883       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3884         logger.Error("Can't rollback on secondary")
3885       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3886       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3887         logger.Error("Can't rollback on primary")
3888       raise errors.OpExecError("Can't add mirror component to md array")
3889
3890     disk.children.append(new_drbd)
3891
3892     self.cfg.AddInstance(instance)
3893
3894     _WaitForSync(self.cfg, instance, self.proc)
3895
3896     return 0
3897
3898
3899 class LURemoveMDDRBDComponent(LogicalUnit):
3900   """Remove a component from a remote_raid1 disk.
3901
3902   """
3903   HPATH = "mirror-remove"
3904   HTYPE = constants.HTYPE_INSTANCE
3905   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3906
3907   def BuildHooksEnv(self):
3908     """Build hooks env.
3909
3910     This runs on the master, the primary and all the secondaries.
3911
3912     """
3913     env = {
3914       "DISK_NAME": self.op.disk_name,
3915       "DISK_ID": self.op.disk_id,
3916       "OLD_SECONDARY": self.old_secondary,
3917       }
3918     env.update(_BuildInstanceHookEnvByObject(self.instance))
3919     nl = [self.sstore.GetMasterNode(),
3920           self.instance.primary_node] + list(self.instance.secondary_nodes)
3921     return env, nl, nl
3922
3923   def CheckPrereq(self):
3924     """Check prerequisites.
3925
3926     This checks that the instance is in the cluster.
3927
3928     """
3929     instance = self.cfg.GetInstanceInfo(
3930       self.cfg.ExpandInstanceName(self.op.instance_name))
3931     if instance is None:
3932       raise errors.OpPrereqError("Instance '%s' not known" %
3933                                  self.op.instance_name)
3934     self.instance = instance
3935
3936     if instance.disk_template != constants.DT_REMOTE_RAID1:
3937       raise errors.OpPrereqError("Instance's disk layout is not"
3938                                  " remote_raid1.")
3939     for disk in instance.disks:
3940       if disk.iv_name == self.op.disk_name:
3941         break
3942     else:
3943       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3944                                  " instance." % self.op.disk_name)
3945     for child in disk.children:
3946       if (child.dev_type == constants.LD_DRBD7 and
3947           child.logical_id[2] == self.op.disk_id):
3948         break
3949     else:
3950       raise errors.OpPrereqError("Can't find the device with this port.")
3951
3952     if len(disk.children) < 2:
3953       raise errors.OpPrereqError("Cannot remove the last component from"
3954                                  " a mirror.")
3955     self.disk = disk
3956     self.child = child
3957     if self.child.logical_id[0] == instance.primary_node:
3958       oid = 1
3959     else:
3960       oid = 0
3961     self.old_secondary = self.child.logical_id[oid]
3962
3963   def Exec(self, feedback_fn):
3964     """Remove the mirror component
3965
3966     """
3967     instance = self.instance
3968     disk = self.disk
3969     child = self.child
3970     logger.Info("remove mirror component")
3971     self.cfg.SetDiskID(disk, instance.primary_node)
3972     if not rpc.call_blockdev_removechildren(instance.primary_node,
3973                                             disk, [child]):
3974       raise errors.OpExecError("Can't remove child from mirror.")
3975
3976     for node in child.logical_id[:2]:
3977       self.cfg.SetDiskID(child, node)
3978       if not rpc.call_blockdev_remove(node, child):
3979         logger.Error("Warning: failed to remove device from node %s,"
3980                      " continuing operation." % node)
3981
3982     disk.children.remove(child)
3983     self.cfg.AddInstance(instance)
3984
3985
3986 class LUReplaceDisks(LogicalUnit):
3987   """Replace the disks of an instance.
3988
3989   """
3990   HPATH = "mirrors-replace"
3991   HTYPE = constants.HTYPE_INSTANCE
3992   _OP_REQP = ["instance_name", "mode", "disks"]
3993
3994   def _RunAllocator(self):
3995     """Compute a new secondary node using an IAllocator.
3996
3997     """
3998     ial = IAllocator(self.cfg, self.sstore,
3999                      mode=constants.IALLOCATOR_MODE_RELOC,
4000                      name=self.op.instance_name,
4001                      relocate_from=[self.sec_node])
4002
4003     ial.Run(self.op.iallocator)
4004
4005     if not ial.success:
4006       raise errors.OpPrereqError("Can't compute nodes using"
4007                                  " iallocator '%s': %s" % (self.op.iallocator,
4008                                                            ial.info))
4009     if len(ial.nodes) != ial.required_nodes:
4010       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4011                                  " of nodes (%s), required %s" %
4012                                  (len(ial.nodes), ial.required_nodes))
4013     self.op.remote_node = ial.nodes[0]
4014     logger.ToStdout("Selected new secondary for the instance: %s" %
4015                     self.op.remote_node)
4016
4017   def BuildHooksEnv(self):
4018     """Build hooks env.
4019
4020     This runs on the master, the primary and all the secondaries.
4021
4022     """
4023     env = {
4024       "MODE": self.op.mode,
4025       "NEW_SECONDARY": self.op.remote_node,
4026       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4027       }
4028     env.update(_BuildInstanceHookEnvByObject(self.instance))
4029     nl = [
4030       self.sstore.GetMasterNode(),
4031       self.instance.primary_node,
4032       ]
4033     if self.op.remote_node is not None:
4034       nl.append(self.op.remote_node)
4035     return env, nl, nl
4036
4037   def CheckPrereq(self):
4038     """Check prerequisites.
4039
4040     This checks that the instance is in the cluster.
4041
4042     """
4043     if not hasattr(self.op, "remote_node"):
4044       self.op.remote_node = None
4045
4046     instance = self.cfg.GetInstanceInfo(
4047       self.cfg.ExpandInstanceName(self.op.instance_name))
4048     if instance is None:
4049       raise errors.OpPrereqError("Instance '%s' not known" %
4050                                  self.op.instance_name)
4051     self.instance = instance
4052     self.op.instance_name = instance.name
4053
4054     if instance.disk_template not in constants.DTS_NET_MIRROR:
4055       raise errors.OpPrereqError("Instance's disk layout is not"
4056                                  " network mirrored.")
4057
4058     if len(instance.secondary_nodes) != 1:
4059       raise errors.OpPrereqError("The instance has a strange layout,"
4060                                  " expected one secondary but found %d" %
4061                                  len(instance.secondary_nodes))
4062
4063     self.sec_node = instance.secondary_nodes[0]
4064
4065     ia_name = getattr(self.op, "iallocator", None)
4066     if ia_name is not None:
4067       if self.op.remote_node is not None:
4068         raise errors.OpPrereqError("Give either the iallocator or the new"
4069                                    " secondary, not both")
4070       self.op.remote_node = self._RunAllocator()
4071
4072     remote_node = self.op.remote_node
4073     if remote_node is not None:
4074       remote_node = self.cfg.ExpandNodeName(remote_node)
4075       if remote_node is None:
4076         raise errors.OpPrereqError("Node '%s' not known" %
4077                                    self.op.remote_node)
4078       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4079     else:
4080       self.remote_node_info = None
4081     if remote_node == instance.primary_node:
4082       raise errors.OpPrereqError("The specified node is the primary node of"
4083                                  " the instance.")
4084     elif remote_node == self.sec_node:
4085       if self.op.mode == constants.REPLACE_DISK_SEC:
4086         # this is for DRBD8, where we can't execute the same mode of
4087         # replacement as for drbd7 (no different port allocated)
4088         raise errors.OpPrereqError("Same secondary given, cannot execute"
4089                                    " replacement")
4090       # the user gave the current secondary, switch to
4091       # 'no-replace-secondary' mode for drbd7
4092       remote_node = None
4093     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4094         self.op.mode != constants.REPLACE_DISK_ALL):
4095       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4096                                  " disks replacement, not individual ones")
4097     if instance.disk_template == constants.DT_DRBD8:
4098       if (self.op.mode == constants.REPLACE_DISK_ALL and
4099           remote_node is not None):
4100         # switch to replace secondary mode
4101         self.op.mode = constants.REPLACE_DISK_SEC
4102
4103       if self.op.mode == constants.REPLACE_DISK_ALL:
4104         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4105                                    " secondary disk replacement, not"
4106                                    " both at once")
4107       elif self.op.mode == constants.REPLACE_DISK_PRI:
4108         if remote_node is not None:
4109           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4110                                      " the secondary while doing a primary"
4111                                      " node disk replacement")
4112         self.tgt_node = instance.primary_node
4113         self.oth_node = instance.secondary_nodes[0]
4114       elif self.op.mode == constants.REPLACE_DISK_SEC:
4115         self.new_node = remote_node # this can be None, in which case
4116                                     # we don't change the secondary
4117         self.tgt_node = instance.secondary_nodes[0]
4118         self.oth_node = instance.primary_node
4119       else:
4120         raise errors.ProgrammerError("Unhandled disk replace mode")
4121
4122     for name in self.op.disks:
4123       if instance.FindDisk(name) is None:
4124         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4125                                    (name, instance.name))
4126     self.op.remote_node = remote_node
4127
4128   def _ExecRR1(self, feedback_fn):
4129     """Replace the disks of an instance.
4130
4131     """
4132     instance = self.instance
4133     iv_names = {}
4134     # start of work
4135     if self.op.remote_node is None:
4136       remote_node = self.sec_node
4137     else:
4138       remote_node = self.op.remote_node
4139     cfg = self.cfg
4140     for dev in instance.disks:
4141       size = dev.size
4142       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4143       names = _GenerateUniqueNames(cfg, lv_names)
4144       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4145                                        remote_node, size, names)
4146       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4147       logger.Info("adding new mirror component on secondary for %s" %
4148                   dev.iv_name)
4149       #HARDCODE
4150       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4151                                         new_drbd, False,
4152                                         _GetInstanceInfoText(instance)):
4153         raise errors.OpExecError("Failed to create new component on secondary"
4154                                  " node %s. Full abort, cleanup manually!" %
4155                                  remote_node)
4156
4157       logger.Info("adding new mirror component on primary")
4158       #HARDCODE
4159       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4160                                       instance, new_drbd,
4161                                       _GetInstanceInfoText(instance)):
4162         # remove secondary dev
4163         cfg.SetDiskID(new_drbd, remote_node)
4164         rpc.call_blockdev_remove(remote_node, new_drbd)
4165         raise errors.OpExecError("Failed to create volume on primary!"
4166                                  " Full abort, cleanup manually!!")
4167
4168       # the device exists now
4169       # call the primary node to add the mirror to md
4170       logger.Info("adding new mirror component to md")
4171       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4172                                            [new_drbd]):
4173         logger.Error("Can't add mirror compoment to md!")
4174         cfg.SetDiskID(new_drbd, remote_node)
4175         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4176           logger.Error("Can't rollback on secondary")
4177         cfg.SetDiskID(new_drbd, instance.primary_node)
4178         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4179           logger.Error("Can't rollback on primary")
4180         raise errors.OpExecError("Full abort, cleanup manually!!")
4181
4182       dev.children.append(new_drbd)
4183       cfg.AddInstance(instance)
4184
4185     # this can fail as the old devices are degraded and _WaitForSync
4186     # does a combined result over all disks, so we don't check its
4187     # return value
4188     _WaitForSync(cfg, instance, self.proc, unlock=True)
4189
4190     # so check manually all the devices
4191     for name in iv_names:
4192       dev, child, new_drbd = iv_names[name]
4193       cfg.SetDiskID(dev, instance.primary_node)
4194       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4195       if is_degr:
4196         raise errors.OpExecError("MD device %s is degraded!" % name)
4197       cfg.SetDiskID(new_drbd, instance.primary_node)
4198       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4199       if is_degr:
4200         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4201
4202     for name in iv_names:
4203       dev, child, new_drbd = iv_names[name]
4204       logger.Info("remove mirror %s component" % name)
4205       cfg.SetDiskID(dev, instance.primary_node)
4206       if not rpc.call_blockdev_removechildren(instance.primary_node,
4207                                               dev, [child]):
4208         logger.Error("Can't remove child from mirror, aborting"
4209                      " *this device cleanup*.\nYou need to cleanup manually!!")
4210         continue
4211
4212       for node in child.logical_id[:2]:
4213         logger.Info("remove child device on %s" % node)
4214         cfg.SetDiskID(child, node)
4215         if not rpc.call_blockdev_remove(node, child):
4216           logger.Error("Warning: failed to remove device from node %s,"
4217                        " continuing operation." % node)
4218
4219       dev.children.remove(child)
4220
4221       cfg.AddInstance(instance)
4222
4223   def _ExecD8DiskOnly(self, feedback_fn):
4224     """Replace a disk on the primary or secondary for dbrd8.
4225
4226     The algorithm for replace is quite complicated:
4227       - for each disk to be replaced:
4228         - create new LVs on the target node with unique names
4229         - detach old LVs from the drbd device
4230         - rename old LVs to name_replaced.<time_t>
4231         - rename new LVs to old LVs
4232         - attach the new LVs (with the old names now) to the drbd device
4233       - wait for sync across all devices
4234       - for each modified disk:
4235         - remove old LVs (which have the name name_replaces.<time_t>)
4236
4237     Failures are not very well handled.
4238
4239     """
4240     steps_total = 6
4241     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4242     instance = self.instance
4243     iv_names = {}
4244     vgname = self.cfg.GetVGName()
4245     # start of work
4246     cfg = self.cfg
4247     tgt_node = self.tgt_node
4248     oth_node = self.oth_node
4249
4250     # Step: check device activation
4251     self.proc.LogStep(1, steps_total, "check device existence")
4252     info("checking volume groups")
4253     my_vg = cfg.GetVGName()
4254     results = rpc.call_vg_list([oth_node, tgt_node])
4255     if not results:
4256       raise errors.OpExecError("Can't list volume groups on the nodes")
4257     for node in oth_node, tgt_node:
4258       res = results.get(node, False)
4259       if not res or my_vg not in res:
4260         raise errors.OpExecError("Volume group '%s' not found on %s" %
4261                                  (my_vg, node))
4262     for dev in instance.disks:
4263       if not dev.iv_name in self.op.disks:
4264         continue
4265       for node in tgt_node, oth_node:
4266         info("checking %s on %s" % (dev.iv_name, node))
4267         cfg.SetDiskID(dev, node)
4268         if not rpc.call_blockdev_find(node, dev):
4269           raise errors.OpExecError("Can't find device %s on node %s" %
4270                                    (dev.iv_name, node))
4271
4272     # Step: check other node consistency
4273     self.proc.LogStep(2, steps_total, "check peer consistency")
4274     for dev in instance.disks:
4275       if not dev.iv_name in self.op.disks:
4276         continue
4277       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4278       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4279                                    oth_node==instance.primary_node):
4280         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4281                                  " to replace disks on this node (%s)" %
4282                                  (oth_node, tgt_node))
4283
4284     # Step: create new storage
4285     self.proc.LogStep(3, steps_total, "allocate new storage")
4286     for dev in instance.disks:
4287       if not dev.iv_name in self.op.disks:
4288         continue
4289       size = dev.size
4290       cfg.SetDiskID(dev, tgt_node)
4291       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4292       names = _GenerateUniqueNames(cfg, lv_names)
4293       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4294                              logical_id=(vgname, names[0]))
4295       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4296                              logical_id=(vgname, names[1]))
4297       new_lvs = [lv_data, lv_meta]
4298       old_lvs = dev.children
4299       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4300       info("creating new local storage on %s for %s" %
4301            (tgt_node, dev.iv_name))
4302       # since we *always* want to create this LV, we use the
4303       # _Create...OnPrimary (which forces the creation), even if we
4304       # are talking about the secondary node
4305       for new_lv in new_lvs:
4306         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4307                                         _GetInstanceInfoText(instance)):
4308           raise errors.OpExecError("Failed to create new LV named '%s' on"
4309                                    " node '%s'" %
4310                                    (new_lv.logical_id[1], tgt_node))
4311
4312     # Step: for each lv, detach+rename*2+attach
4313     self.proc.LogStep(4, steps_total, "change drbd configuration")
4314     for dev, old_lvs, new_lvs in iv_names.itervalues():
4315       info("detaching %s drbd from local storage" % dev.iv_name)
4316       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4317         raise errors.OpExecError("Can't detach drbd from local storage on node"
4318                                  " %s for device %s" % (tgt_node, dev.iv_name))
4319       #dev.children = []
4320       #cfg.Update(instance)
4321
4322       # ok, we created the new LVs, so now we know we have the needed
4323       # storage; as such, we proceed on the target node to rename
4324       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4325       # using the assumption that logical_id == physical_id (which in
4326       # turn is the unique_id on that node)
4327
4328       # FIXME(iustin): use a better name for the replaced LVs
4329       temp_suffix = int(time.time())
4330       ren_fn = lambda d, suff: (d.physical_id[0],
4331                                 d.physical_id[1] + "_replaced-%s" % suff)
4332       # build the rename list based on what LVs exist on the node
4333       rlist = []
4334       for to_ren in old_lvs:
4335         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4336         if find_res is not None: # device exists
4337           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4338
4339       info("renaming the old LVs on the target node")
4340       if not rpc.call_blockdev_rename(tgt_node, rlist):
4341         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4342       # now we rename the new LVs to the old LVs
4343       info("renaming the new LVs on the target node")
4344       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4345       if not rpc.call_blockdev_rename(tgt_node, rlist):
4346         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4347
4348       for old, new in zip(old_lvs, new_lvs):
4349         new.logical_id = old.logical_id
4350         cfg.SetDiskID(new, tgt_node)
4351
4352       for disk in old_lvs:
4353         disk.logical_id = ren_fn(disk, temp_suffix)
4354         cfg.SetDiskID(disk, tgt_node)
4355
4356       # now that the new lvs have the old name, we can add them to the device
4357       info("adding new mirror component on %s" % tgt_node)
4358       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4359         for new_lv in new_lvs:
4360           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4361             warning("Can't rollback device %s", hint="manually cleanup unused"
4362                     " logical volumes")
4363         raise errors.OpExecError("Can't add local storage to drbd")
4364
4365       dev.children = new_lvs
4366       cfg.Update(instance)
4367
4368     # Step: wait for sync
4369
4370     # this can fail as the old devices are degraded and _WaitForSync
4371     # does a combined result over all disks, so we don't check its
4372     # return value
4373     self.proc.LogStep(5, steps_total, "sync devices")
4374     _WaitForSync(cfg, instance, self.proc, unlock=True)
4375
4376     # so check manually all the devices
4377     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4378       cfg.SetDiskID(dev, instance.primary_node)
4379       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4380       if is_degr:
4381         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4382
4383     # Step: remove old storage
4384     self.proc.LogStep(6, steps_total, "removing old storage")
4385     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4386       info("remove logical volumes for %s" % name)
4387       for lv in old_lvs:
4388         cfg.SetDiskID(lv, tgt_node)
4389         if not rpc.call_blockdev_remove(tgt_node, lv):
4390           warning("Can't remove old LV", hint="manually remove unused LVs")
4391           continue
4392
4393   def _ExecD8Secondary(self, feedback_fn):
4394     """Replace the secondary node for drbd8.
4395
4396     The algorithm for replace is quite complicated:
4397       - for all disks of the instance:
4398         - create new LVs on the new node with same names
4399         - shutdown the drbd device on the old secondary
4400         - disconnect the drbd network on the primary
4401         - create the drbd device on the new secondary
4402         - network attach the drbd on the primary, using an artifice:
4403           the drbd code for Attach() will connect to the network if it
4404           finds a device which is connected to the good local disks but
4405           not network enabled
4406       - wait for sync across all devices
4407       - remove all disks from the old secondary
4408
4409     Failures are not very well handled.
4410
4411     """
4412     steps_total = 6
4413     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4414     instance = self.instance
4415     iv_names = {}
4416     vgname = self.cfg.GetVGName()
4417     # start of work
4418     cfg = self.cfg
4419     old_node = self.tgt_node
4420     new_node = self.new_node
4421     pri_node = instance.primary_node
4422
4423     # Step: check device activation
4424     self.proc.LogStep(1, steps_total, "check device existence")
4425     info("checking volume groups")
4426     my_vg = cfg.GetVGName()
4427     results = rpc.call_vg_list([pri_node, new_node])
4428     if not results:
4429       raise errors.OpExecError("Can't list volume groups on the nodes")
4430     for node in pri_node, new_node:
4431       res = results.get(node, False)
4432       if not res or my_vg not in res:
4433         raise errors.OpExecError("Volume group '%s' not found on %s" %
4434                                  (my_vg, node))
4435     for dev in instance.disks:
4436       if not dev.iv_name in self.op.disks:
4437         continue
4438       info("checking %s on %s" % (dev.iv_name, pri_node))
4439       cfg.SetDiskID(dev, pri_node)
4440       if not rpc.call_blockdev_find(pri_node, dev):
4441         raise errors.OpExecError("Can't find device %s on node %s" %
4442                                  (dev.iv_name, pri_node))
4443
4444     # Step: check other node consistency
4445     self.proc.LogStep(2, steps_total, "check peer consistency")
4446     for dev in instance.disks:
4447       if not dev.iv_name in self.op.disks:
4448         continue
4449       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4450       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4451         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4452                                  " unsafe to replace the secondary" %
4453                                  pri_node)
4454
4455     # Step: create new storage
4456     self.proc.LogStep(3, steps_total, "allocate new storage")
4457     for dev in instance.disks:
4458       size = dev.size
4459       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4460       # since we *always* want to create this LV, we use the
4461       # _Create...OnPrimary (which forces the creation), even if we
4462       # are talking about the secondary node
4463       for new_lv in dev.children:
4464         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4465                                         _GetInstanceInfoText(instance)):
4466           raise errors.OpExecError("Failed to create new LV named '%s' on"
4467                                    " node '%s'" %
4468                                    (new_lv.logical_id[1], new_node))
4469
4470       iv_names[dev.iv_name] = (dev, dev.children)
4471
4472     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4473     for dev in instance.disks:
4474       size = dev.size
4475       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4476       # create new devices on new_node
4477       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4478                               logical_id=(pri_node, new_node,
4479                                           dev.logical_id[2]),
4480                               children=dev.children)
4481       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4482                                         new_drbd, False,
4483                                       _GetInstanceInfoText(instance)):
4484         raise errors.OpExecError("Failed to create new DRBD on"
4485                                  " node '%s'" % new_node)
4486
4487     for dev in instance.disks:
4488       # we have new devices, shutdown the drbd on the old secondary
4489       info("shutting down drbd for %s on old node" % dev.iv_name)
4490       cfg.SetDiskID(dev, old_node)
4491       if not rpc.call_blockdev_shutdown(old_node, dev):
4492         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4493                 hint="Please cleanup this device manually as soon as possible")
4494
4495     info("detaching primary drbds from the network (=> standalone)")
4496     done = 0
4497     for dev in instance.disks:
4498       cfg.SetDiskID(dev, pri_node)
4499       # set the physical (unique in bdev terms) id to None, meaning
4500       # detach from network
4501       dev.physical_id = (None,) * len(dev.physical_id)
4502       # and 'find' the device, which will 'fix' it to match the
4503       # standalone state
4504       if rpc.call_blockdev_find(pri_node, dev):
4505         done += 1
4506       else:
4507         warning("Failed to detach drbd %s from network, unusual case" %
4508                 dev.iv_name)
4509
4510     if not done:
4511       # no detaches succeeded (very unlikely)
4512       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4513
4514     # if we managed to detach at least one, we update all the disks of
4515     # the instance to point to the new secondary
4516     info("updating instance configuration")
4517     for dev in instance.disks:
4518       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4519       cfg.SetDiskID(dev, pri_node)
4520     cfg.Update(instance)
4521
4522     # and now perform the drbd attach
4523     info("attaching primary drbds to new secondary (standalone => connected)")
4524     failures = []
4525     for dev in instance.disks:
4526       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4527       # since the attach is smart, it's enough to 'find' the device,
4528       # it will automatically activate the network, if the physical_id
4529       # is correct
4530       cfg.SetDiskID(dev, pri_node)
4531       if not rpc.call_blockdev_find(pri_node, dev):
4532         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4533                 "please do a gnt-instance info to see the status of disks")
4534
4535     # this can fail as the old devices are degraded and _WaitForSync
4536     # does a combined result over all disks, so we don't check its
4537     # return value
4538     self.proc.LogStep(5, steps_total, "sync devices")
4539     _WaitForSync(cfg, instance, self.proc, unlock=True)
4540
4541     # so check manually all the devices
4542     for name, (dev, old_lvs) in iv_names.iteritems():
4543       cfg.SetDiskID(dev, pri_node)
4544       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4545       if is_degr:
4546         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4547
4548     self.proc.LogStep(6, steps_total, "removing old storage")
4549     for name, (dev, old_lvs) in iv_names.iteritems():
4550       info("remove logical volumes for %s" % name)
4551       for lv in old_lvs:
4552         cfg.SetDiskID(lv, old_node)
4553         if not rpc.call_blockdev_remove(old_node, lv):
4554           warning("Can't remove LV on old secondary",
4555                   hint="Cleanup stale volumes by hand")
4556
4557   def Exec(self, feedback_fn):
4558     """Execute disk replacement.
4559
4560     This dispatches the disk replacement to the appropriate handler.
4561
4562     """
4563     instance = self.instance
4564
4565     # Activate the instance disks if we're replacing them on a down instance
4566     if instance.status == "down":
4567       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4568       self.proc.ChainOpCode(op)
4569
4570     if instance.disk_template == constants.DT_REMOTE_RAID1:
4571       fn = self._ExecRR1
4572     elif instance.disk_template == constants.DT_DRBD8:
4573       if self.op.remote_node is None:
4574         fn = self._ExecD8DiskOnly
4575       else:
4576         fn = self._ExecD8Secondary
4577     else:
4578       raise errors.ProgrammerError("Unhandled disk replacement case")
4579
4580     ret = fn(feedback_fn)
4581
4582     # Deactivate the instance disks if we're replacing them on a down instance
4583     if instance.status == "down":
4584       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4585       self.proc.ChainOpCode(op)
4586
4587     return ret
4588
4589
4590 class LUGrowDisk(LogicalUnit):
4591   """Grow a disk of an instance.
4592
4593   """
4594   HPATH = "disk-grow"
4595   HTYPE = constants.HTYPE_INSTANCE
4596   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4597
4598   def BuildHooksEnv(self):
4599     """Build hooks env.
4600
4601     This runs on the master, the primary and all the secondaries.
4602
4603     """
4604     env = {
4605       "DISK": self.op.disk,
4606       "AMOUNT": self.op.amount,
4607       }
4608     env.update(_BuildInstanceHookEnvByObject(self.instance))
4609     nl = [
4610       self.sstore.GetMasterNode(),
4611       self.instance.primary_node,
4612       ]
4613     return env, nl, nl
4614
4615   def CheckPrereq(self):
4616     """Check prerequisites.
4617
4618     This checks that the instance is in the cluster.
4619
4620     """
4621     instance = self.cfg.GetInstanceInfo(
4622       self.cfg.ExpandInstanceName(self.op.instance_name))
4623     if instance is None:
4624       raise errors.OpPrereqError("Instance '%s' not known" %
4625                                  self.op.instance_name)
4626
4627     if self.op.amount <= 0:
4628       raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4629
4630     self.instance = instance
4631     self.op.instance_name = instance.name
4632
4633     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4634       raise errors.OpPrereqError("Instance's disk layout does not support"
4635                                  " growing.")
4636
4637     self.disk = instance.FindDisk(self.op.disk)
4638     if self.disk is None:
4639       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4640                                  (self.op.disk, instance.name))
4641
4642     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4643     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4644     for node in nodenames:
4645       info = nodeinfo.get(node, None)
4646       if not info:
4647         raise errors.OpPrereqError("Cannot get current information"
4648                                    " from node '%s'" % node)
4649       vg_free = info.get('vg_free', None)
4650       if not isinstance(vg_free, int):
4651         raise errors.OpPrereqError("Can't compute free disk space on"
4652                                    " node %s" % node)
4653       if self.op.amount > info['vg_free']:
4654         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4655                                    " %d MiB available, %d MiB required" %
4656                                    (node, info['vg_free'], self.op.amount))
4657       is_primary = (node == instance.primary_node)
4658       if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4659         raise errors.OpPrereqError("Disk %s is degraded or not fully"
4660                                  " synchronized on node %s,"
4661                                  " aborting grow." % (self.op.disk, node))
4662
4663   def Exec(self, feedback_fn):
4664     """Execute disk grow.
4665
4666     """
4667     instance = self.instance
4668     disk = self.disk
4669     for node in (instance.secondary_nodes + (instance.primary_node,)):
4670       self.cfg.SetDiskID(disk, node)
4671       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4672       if not result or not isinstance(result, tuple) or len(result) != 2:
4673         raise errors.OpExecError("grow request failed to node %s" % node)
4674       elif not result[0]:
4675         raise errors.OpExecError("grow request failed to node %s: %s" %
4676                                  (node, result[1]))
4677     disk.RecordGrow(self.op.amount)
4678     self.cfg.Update(instance)
4679     if self.op.wait_for_sync:
4680       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4681       if disk_abort:
4682         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4683                      " Please check the instance.")
4684
4685
4686 class LUQueryInstanceData(NoHooksLU):
4687   """Query runtime instance data.
4688
4689   """
4690   _OP_REQP = ["instances"]
4691
4692   def CheckPrereq(self):
4693     """Check prerequisites.
4694
4695     This only checks the optional instance list against the existing names.
4696
4697     """
4698     if not isinstance(self.op.instances, list):
4699       raise errors.OpPrereqError("Invalid argument type 'instances'")
4700     if self.op.instances:
4701       self.wanted_instances = []
4702       names = self.op.instances
4703       for name in names:
4704         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4705         if instance is None:
4706           raise errors.OpPrereqError("No such instance name '%s'" % name)
4707         self.wanted_instances.append(instance)
4708     else:
4709       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4710                                in self.cfg.GetInstanceList()]
4711     return
4712
4713
4714   def _ComputeDiskStatus(self, instance, snode, dev):
4715     """Compute block device status.
4716
4717     """
4718     self.cfg.SetDiskID(dev, instance.primary_node)
4719     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4720     if dev.dev_type in constants.LDS_DRBD:
4721       # we change the snode then (otherwise we use the one passed in)
4722       if dev.logical_id[0] == instance.primary_node:
4723         snode = dev.logical_id[1]
4724       else:
4725         snode = dev.logical_id[0]
4726
4727     if snode:
4728       self.cfg.SetDiskID(dev, snode)
4729       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4730     else:
4731       dev_sstatus = None
4732
4733     if dev.children:
4734       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4735                       for child in dev.children]
4736     else:
4737       dev_children = []
4738
4739     data = {
4740       "iv_name": dev.iv_name,
4741       "dev_type": dev.dev_type,
4742       "logical_id": dev.logical_id,
4743       "physical_id": dev.physical_id,
4744       "pstatus": dev_pstatus,
4745       "sstatus": dev_sstatus,
4746       "children": dev_children,
4747       }
4748
4749     return data
4750
4751   def Exec(self, feedback_fn):
4752     """Gather and return data"""
4753     result = {}
4754     for instance in self.wanted_instances:
4755       remote_info = rpc.call_instance_info(instance.primary_node,
4756                                                 instance.name)
4757       if remote_info and "state" in remote_info:
4758         remote_state = "up"
4759       else:
4760         remote_state = "down"
4761       if instance.status == "down":
4762         config_state = "down"
4763       else:
4764         config_state = "up"
4765
4766       disks = [self._ComputeDiskStatus(instance, None, device)
4767                for device in instance.disks]
4768
4769       idict = {
4770         "name": instance.name,
4771         "config_state": config_state,
4772         "run_state": remote_state,
4773         "pnode": instance.primary_node,
4774         "snodes": instance.secondary_nodes,
4775         "os": instance.os,
4776         "memory": instance.memory,
4777         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4778         "disks": disks,
4779         "vcpus": instance.vcpus,
4780         }
4781
4782       htkind = self.sstore.GetHypervisorType()
4783       if htkind == constants.HT_XEN_PVM30:
4784         idict["kernel_path"] = instance.kernel_path
4785         idict["initrd_path"] = instance.initrd_path
4786
4787       if htkind == constants.HT_XEN_HVM31:
4788         idict["hvm_boot_order"] = instance.hvm_boot_order
4789         idict["hvm_acpi"] = instance.hvm_acpi
4790         idict["hvm_pae"] = instance.hvm_pae
4791         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4792
4793       if htkind in constants.HTS_REQ_PORT:
4794         idict["vnc_bind_address"] = instance.vnc_bind_address
4795         idict["network_port"] = instance.network_port
4796
4797       result[instance.name] = idict
4798
4799     return result
4800
4801
4802 class LUSetInstanceParms(LogicalUnit):
4803   """Modifies an instances's parameters.
4804
4805   """
4806   HPATH = "instance-modify"
4807   HTYPE = constants.HTYPE_INSTANCE
4808   _OP_REQP = ["instance_name"]
4809
4810   def BuildHooksEnv(self):
4811     """Build hooks env.
4812
4813     This runs on the master, primary and secondaries.
4814
4815     """
4816     args = dict()
4817     if self.mem:
4818       args['memory'] = self.mem
4819     if self.vcpus:
4820       args['vcpus'] = self.vcpus
4821     if self.do_ip or self.do_bridge or self.mac:
4822       if self.do_ip:
4823         ip = self.ip
4824       else:
4825         ip = self.instance.nics[0].ip
4826       if self.bridge:
4827         bridge = self.bridge
4828       else:
4829         bridge = self.instance.nics[0].bridge
4830       if self.mac:
4831         mac = self.mac
4832       else:
4833         mac = self.instance.nics[0].mac
4834       args['nics'] = [(ip, bridge, mac)]
4835     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4836     nl = [self.sstore.GetMasterNode(),
4837           self.instance.primary_node] + list(self.instance.secondary_nodes)
4838     return env, nl, nl
4839
4840   def CheckPrereq(self):
4841     """Check prerequisites.
4842
4843     This only checks the instance list against the existing names.
4844
4845     """
4846     self.mem = getattr(self.op, "mem", None)
4847     self.vcpus = getattr(self.op, "vcpus", None)
4848     self.ip = getattr(self.op, "ip", None)
4849     self.mac = getattr(self.op, "mac", None)
4850     self.bridge = getattr(self.op, "bridge", None)
4851     self.kernel_path = getattr(self.op, "kernel_path", None)
4852     self.initrd_path = getattr(self.op, "initrd_path", None)
4853     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4854     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4855     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4856     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4857     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4858     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4859                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4860                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4861                  self.vnc_bind_address]
4862     if all_parms.count(None) == len(all_parms):
4863       raise errors.OpPrereqError("No changes submitted")
4864     if self.mem is not None:
4865       try:
4866         self.mem = int(self.mem)
4867       except ValueError, err:
4868         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4869     if self.vcpus is not None:
4870       try:
4871         self.vcpus = int(self.vcpus)
4872       except ValueError, err:
4873         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4874     if self.ip is not None:
4875       self.do_ip = True
4876       if self.ip.lower() == "none":
4877         self.ip = None
4878       else:
4879         if not utils.IsValidIP(self.ip):
4880           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4881     else:
4882       self.do_ip = False
4883     self.do_bridge = (self.bridge is not None)
4884     if self.mac is not None:
4885       if self.cfg.IsMacInUse(self.mac):
4886         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4887                                    self.mac)
4888       if not utils.IsValidMac(self.mac):
4889         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4890
4891     if self.kernel_path is not None:
4892       self.do_kernel_path = True
4893       if self.kernel_path == constants.VALUE_NONE:
4894         raise errors.OpPrereqError("Can't set instance to no kernel")
4895
4896       if self.kernel_path != constants.VALUE_DEFAULT:
4897         if not os.path.isabs(self.kernel_path):
4898           raise errors.OpPrereqError("The kernel path must be an absolute"
4899                                     " filename")
4900     else:
4901       self.do_kernel_path = False
4902
4903     if self.initrd_path is not None:
4904       self.do_initrd_path = True
4905       if self.initrd_path not in (constants.VALUE_NONE,
4906                                   constants.VALUE_DEFAULT):
4907         if not os.path.isabs(self.initrd_path):
4908           raise errors.OpPrereqError("The initrd path must be an absolute"
4909                                     " filename")
4910     else:
4911       self.do_initrd_path = False
4912
4913     # boot order verification
4914     if self.hvm_boot_order is not None:
4915       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4916         if len(self.hvm_boot_order.strip("acdn")) != 0:
4917           raise errors.OpPrereqError("invalid boot order specified,"
4918                                      " must be one or more of [acdn]"
4919                                      " or 'default'")
4920
4921     # hvm_cdrom_image_path verification
4922     if self.op.hvm_cdrom_image_path is not None:
4923       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4924               self.op.hvm_cdrom_image_path.lower() == "none"):
4925         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4926                                    " be an absolute path or None, not %s" %
4927                                    self.op.hvm_cdrom_image_path)
4928       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4929               self.op.hvm_cdrom_image_path.lower() == "none"):
4930         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4931                                    " regular file or a symlink pointing to"
4932                                    " an existing regular file, not %s" %
4933                                    self.op.hvm_cdrom_image_path)
4934
4935     # vnc_bind_address verification
4936     if self.op.vnc_bind_address is not None:
4937       if not utils.IsValidIP(self.op.vnc_bind_address):
4938         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4939                                    " like a valid IP address" %
4940                                    self.op.vnc_bind_address)
4941
4942     instance = self.cfg.GetInstanceInfo(
4943       self.cfg.ExpandInstanceName(self.op.instance_name))
4944     if instance is None:
4945       raise errors.OpPrereqError("No such instance name '%s'" %
4946                                  self.op.instance_name)
4947     self.op.instance_name = instance.name
4948     self.instance = instance
4949     return
4950
4951   def Exec(self, feedback_fn):
4952     """Modifies an instance.
4953
4954     All parameters take effect only at the next restart of the instance.
4955     """
4956     result = []
4957     instance = self.instance
4958     if self.mem:
4959       instance.memory = self.mem
4960       result.append(("mem", self.mem))
4961     if self.vcpus:
4962       instance.vcpus = self.vcpus
4963       result.append(("vcpus",  self.vcpus))
4964     if self.do_ip:
4965       instance.nics[0].ip = self.ip
4966       result.append(("ip", self.ip))
4967     if self.bridge:
4968       instance.nics[0].bridge = self.bridge
4969       result.append(("bridge", self.bridge))
4970     if self.mac:
4971       instance.nics[0].mac = self.mac
4972       result.append(("mac", self.mac))
4973     if self.do_kernel_path:
4974       instance.kernel_path = self.kernel_path
4975       result.append(("kernel_path", self.kernel_path))
4976     if self.do_initrd_path:
4977       instance.initrd_path = self.initrd_path
4978       result.append(("initrd_path", self.initrd_path))
4979     if self.hvm_boot_order:
4980       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4981         instance.hvm_boot_order = None
4982       else:
4983         instance.hvm_boot_order = self.hvm_boot_order
4984       result.append(("hvm_boot_order", self.hvm_boot_order))
4985     if self.hvm_acpi is not None:
4986       instance.hvm_acpi = self.hvm_acpi
4987       result.append(("hvm_acpi", self.hvm_acpi))
4988     if self.hvm_pae is not None:
4989       instance.hvm_pae = self.hvm_pae
4990       result.append(("hvm_pae", self.hvm_pae))
4991     if self.hvm_cdrom_image_path:
4992       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4993         instance.hvm_cdrom_image_path = None
4994       else:
4995         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4996       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4997     if self.vnc_bind_address:
4998       instance.vnc_bind_address = self.vnc_bind_address
4999       result.append(("vnc_bind_address", self.vnc_bind_address))
5000
5001     self.cfg.AddInstance(instance)
5002
5003     return result
5004
5005
5006 class LUQueryExports(NoHooksLU):
5007   """Query the exports list
5008
5009   """
5010   _OP_REQP = []
5011
5012   def CheckPrereq(self):
5013     """Check that the nodelist contains only existing nodes.
5014
5015     """
5016     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5017
5018   def Exec(self, feedback_fn):
5019     """Compute the list of all the exported system images.
5020
5021     Returns:
5022       a dictionary with the structure node->(export-list)
5023       where export-list is a list of the instances exported on
5024       that node.
5025
5026     """
5027     return rpc.call_export_list(self.nodes)
5028
5029
5030 class LUExportInstance(LogicalUnit):
5031   """Export an instance to an image in the cluster.
5032
5033   """
5034   HPATH = "instance-export"
5035   HTYPE = constants.HTYPE_INSTANCE
5036   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5037
5038   def BuildHooksEnv(self):
5039     """Build hooks env.
5040
5041     This will run on the master, primary node and target node.
5042
5043     """
5044     env = {
5045       "EXPORT_NODE": self.op.target_node,
5046       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5047       }
5048     env.update(_BuildInstanceHookEnvByObject(self.instance))
5049     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5050           self.op.target_node]
5051     return env, nl, nl
5052
5053   def CheckPrereq(self):
5054     """Check prerequisites.
5055
5056     This checks that the instance and node names are valid.
5057
5058     """
5059     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5060     self.instance = self.cfg.GetInstanceInfo(instance_name)
5061     if self.instance is None:
5062       raise errors.OpPrereqError("Instance '%s' not found" %
5063                                  self.op.instance_name)
5064
5065     # node verification
5066     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5067     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5068
5069     if self.dst_node is None:
5070       raise errors.OpPrereqError("Destination node '%s' is unknown." %
5071                                  self.op.target_node)
5072     self.op.target_node = self.dst_node.name
5073
5074   def Exec(self, feedback_fn):
5075     """Export an instance to an image in the cluster.
5076
5077     """
5078     instance = self.instance
5079     dst_node = self.dst_node
5080     src_node = instance.primary_node
5081     if self.op.shutdown:
5082       # shutdown the instance, but not the disks
5083       if not rpc.call_instance_shutdown(src_node, instance):
5084         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5085                                  (instance.name, src_node))
5086
5087     vgname = self.cfg.GetVGName()
5088
5089     snap_disks = []
5090
5091     try:
5092       for disk in instance.disks:
5093         if disk.iv_name == "sda":
5094           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5095           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5096
5097           if not new_dev_name:
5098             logger.Error("could not snapshot block device %s on node %s" %
5099                          (disk.logical_id[1], src_node))
5100           else:
5101             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5102                                       logical_id=(vgname, new_dev_name),
5103                                       physical_id=(vgname, new_dev_name),
5104                                       iv_name=disk.iv_name)
5105             snap_disks.append(new_dev)
5106
5107     finally:
5108       if self.op.shutdown and instance.status == "up":
5109         if not rpc.call_instance_start(src_node, instance, None):
5110           _ShutdownInstanceDisks(instance, self.cfg)
5111           raise errors.OpExecError("Could not start instance")
5112
5113     # TODO: check for size
5114
5115     for dev in snap_disks:
5116       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5117                                            instance):
5118         logger.Error("could not export block device %s from node"
5119                      " %s to node %s" %
5120                      (dev.logical_id[1], src_node, dst_node.name))
5121       if not rpc.call_blockdev_remove(src_node, dev):
5122         logger.Error("could not remove snapshot block device %s from"
5123                      " node %s" % (dev.logical_id[1], src_node))
5124
5125     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5126       logger.Error("could not finalize export for instance %s on node %s" %
5127                    (instance.name, dst_node.name))
5128
5129     nodelist = self.cfg.GetNodeList()
5130     nodelist.remove(dst_node.name)
5131
5132     # on one-node clusters nodelist will be empty after the removal
5133     # if we proceed the backup would be removed because OpQueryExports
5134     # substitutes an empty list with the full cluster node list.
5135     if nodelist:
5136       op = opcodes.OpQueryExports(nodes=nodelist)
5137       exportlist = self.proc.ChainOpCode(op)
5138       for node in exportlist:
5139         if instance.name in exportlist[node]:
5140           if not rpc.call_export_remove(node, instance.name):
5141             logger.Error("could not remove older export for instance %s"
5142                          " on node %s" % (instance.name, node))
5143
5144
5145 class LURemoveExport(NoHooksLU):
5146   """Remove exports related to the named instance.
5147
5148   """
5149   _OP_REQP = ["instance_name"]
5150
5151   def CheckPrereq(self):
5152     """Check prerequisites.
5153     """
5154     pass
5155
5156   def Exec(self, feedback_fn):
5157     """Remove any export.
5158
5159     """
5160     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5161     # If the instance was not found we'll try with the name that was passed in.
5162     # This will only work if it was an FQDN, though.
5163     fqdn_warn = False
5164     if not instance_name:
5165       fqdn_warn = True
5166       instance_name = self.op.instance_name
5167
5168     op = opcodes.OpQueryExports(nodes=[])
5169     exportlist = self.proc.ChainOpCode(op)
5170     found = False
5171     for node in exportlist:
5172       if instance_name in exportlist[node]:
5173         found = True
5174         if not rpc.call_export_remove(node, instance_name):
5175           logger.Error("could not remove export for instance %s"
5176                        " on node %s" % (instance_name, node))
5177
5178     if fqdn_warn and not found:
5179       feedback_fn("Export not found. If trying to remove an export belonging"
5180                   " to a deleted instance please use its Fully Qualified"
5181                   " Domain Name.")
5182
5183
5184 class TagsLU(NoHooksLU):
5185   """Generic tags LU.
5186
5187   This is an abstract class which is the parent of all the other tags LUs.
5188
5189   """
5190   def CheckPrereq(self):
5191     """Check prerequisites.
5192
5193     """
5194     if self.op.kind == constants.TAG_CLUSTER:
5195       self.target = self.cfg.GetClusterInfo()
5196     elif self.op.kind == constants.TAG_NODE:
5197       name = self.cfg.ExpandNodeName(self.op.name)
5198       if name is None:
5199         raise errors.OpPrereqError("Invalid node name (%s)" %
5200                                    (self.op.name,))
5201       self.op.name = name
5202       self.target = self.cfg.GetNodeInfo(name)
5203     elif self.op.kind == constants.TAG_INSTANCE:
5204       name = self.cfg.ExpandInstanceName(self.op.name)
5205       if name is None:
5206         raise errors.OpPrereqError("Invalid instance name (%s)" %
5207                                    (self.op.name,))
5208       self.op.name = name
5209       self.target = self.cfg.GetInstanceInfo(name)
5210     else:
5211       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5212                                  str(self.op.kind))
5213
5214
5215 class LUGetTags(TagsLU):
5216   """Returns the tags of a given object.
5217
5218   """
5219   _OP_REQP = ["kind", "name"]
5220
5221   def Exec(self, feedback_fn):
5222     """Returns the tag list.
5223
5224     """
5225     return self.target.GetTags()
5226
5227
5228 class LUSearchTags(NoHooksLU):
5229   """Searches the tags for a given pattern.
5230
5231   """
5232   _OP_REQP = ["pattern"]
5233
5234   def CheckPrereq(self):
5235     """Check prerequisites.
5236
5237     This checks the pattern passed for validity by compiling it.
5238
5239     """
5240     try:
5241       self.re = re.compile(self.op.pattern)
5242     except re.error, err:
5243       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5244                                  (self.op.pattern, err))
5245
5246   def Exec(self, feedback_fn):
5247     """Returns the tag list.
5248
5249     """
5250     cfg = self.cfg
5251     tgts = [("/cluster", cfg.GetClusterInfo())]
5252     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5253     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5254     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5255     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5256     results = []
5257     for path, target in tgts:
5258       for tag in target.GetTags():
5259         if self.re.search(tag):
5260           results.append((path, tag))
5261     return results
5262
5263
5264 class LUAddTags(TagsLU):
5265   """Sets a tag on a given object.
5266
5267   """
5268   _OP_REQP = ["kind", "name", "tags"]
5269
5270   def CheckPrereq(self):
5271     """Check prerequisites.
5272
5273     This checks the type and length of the tag name and value.
5274
5275     """
5276     TagsLU.CheckPrereq(self)
5277     for tag in self.op.tags:
5278       objects.TaggableObject.ValidateTag(tag)
5279
5280   def Exec(self, feedback_fn):
5281     """Sets the tag.
5282
5283     """
5284     try:
5285       for tag in self.op.tags:
5286         self.target.AddTag(tag)
5287     except errors.TagError, err:
5288       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5289     try:
5290       self.cfg.Update(self.target)
5291     except errors.ConfigurationError:
5292       raise errors.OpRetryError("There has been a modification to the"
5293                                 " config file and the operation has been"
5294                                 " aborted. Please retry.")
5295
5296
5297 class LUDelTags(TagsLU):
5298   """Delete a list of tags from a given object.
5299
5300   """
5301   _OP_REQP = ["kind", "name", "tags"]
5302
5303   def CheckPrereq(self):
5304     """Check prerequisites.
5305
5306     This checks that we have the given tag.
5307
5308     """
5309     TagsLU.CheckPrereq(self)
5310     for tag in self.op.tags:
5311       objects.TaggableObject.ValidateTag(tag, removal=True)
5312     del_tags = frozenset(self.op.tags)
5313     cur_tags = self.target.GetTags()
5314     if not del_tags <= cur_tags:
5315       diff_tags = del_tags - cur_tags
5316       diff_names = ["'%s'" % tag for tag in diff_tags]
5317       diff_names.sort()
5318       raise errors.OpPrereqError("Tag(s) %s not found" %
5319                                  (",".join(diff_names)))
5320
5321   def Exec(self, feedback_fn):
5322     """Remove the tag from the object.
5323
5324     """
5325     for tag in self.op.tags:
5326       self.target.RemoveTag(tag)
5327     try:
5328       self.cfg.Update(self.target)
5329     except errors.ConfigurationError:
5330       raise errors.OpRetryError("There has been a modification to the"
5331                                 " config file and the operation has been"
5332                                 " aborted. Please retry.")
5333
5334 class LUTestDelay(NoHooksLU):
5335   """Sleep for a specified amount of time.
5336
5337   This LU sleeps on the master and/or nodes for a specified amoutn of
5338   time.
5339
5340   """
5341   _OP_REQP = ["duration", "on_master", "on_nodes"]
5342
5343   def CheckPrereq(self):
5344     """Check prerequisites.
5345
5346     This checks that we have a good list of nodes and/or the duration
5347     is valid.
5348
5349     """
5350
5351     if self.op.on_nodes:
5352       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5353
5354   def Exec(self, feedback_fn):
5355     """Do the actual sleep.
5356
5357     """
5358     if self.op.on_master:
5359       if not utils.TestDelay(self.op.duration):
5360         raise errors.OpExecError("Error during master delay test")
5361     if self.op.on_nodes:
5362       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5363       if not result:
5364         raise errors.OpExecError("Complete failure from rpc call")
5365       for node, node_result in result.items():
5366         if not node_result:
5367           raise errors.OpExecError("Failure during rpc call to node %s,"
5368                                    " result: %s" % (node, node_result))
5369
5370
5371 class IAllocator(object):
5372   """IAllocator framework.
5373
5374   An IAllocator instance has three sets of attributes:
5375     - cfg/sstore that are needed to query the cluster
5376     - input data (all members of the _KEYS class attribute are required)
5377     - four buffer attributes (in|out_data|text), that represent the
5378       input (to the external script) in text and data structure format,
5379       and the output from it, again in two formats
5380     - the result variables from the script (success, info, nodes) for
5381       easy usage
5382
5383   """
5384   _ALLO_KEYS = [
5385     "mem_size", "disks", "disk_template",
5386     "os", "tags", "nics", "vcpus",
5387     ]
5388   _RELO_KEYS = [
5389     "relocate_from",
5390     ]
5391
5392   def __init__(self, cfg, sstore, mode, name, **kwargs):
5393     self.cfg = cfg
5394     self.sstore = sstore
5395     # init buffer variables
5396     self.in_text = self.out_text = self.in_data = self.out_data = None
5397     # init all input fields so that pylint is happy
5398     self.mode = mode
5399     self.name = name
5400     self.mem_size = self.disks = self.disk_template = None
5401     self.os = self.tags = self.nics = self.vcpus = None
5402     self.relocate_from = None
5403     # computed fields
5404     self.required_nodes = None
5405     # init result fields
5406     self.success = self.info = self.nodes = None
5407     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5408       keyset = self._ALLO_KEYS
5409     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5410       keyset = self._RELO_KEYS
5411     else:
5412       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5413                                    " IAllocator" % self.mode)
5414     for key in kwargs:
5415       if key not in keyset:
5416         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5417                                      " IAllocator" % key)
5418       setattr(self, key, kwargs[key])
5419     for key in keyset:
5420       if key not in kwargs:
5421         raise errors.ProgrammerError("Missing input parameter '%s' to"
5422                                      " IAllocator" % key)
5423     self._BuildInputData()
5424
5425   def _ComputeClusterData(self):
5426     """Compute the generic allocator input data.
5427
5428     This is the data that is independent of the actual operation.
5429
5430     """
5431     cfg = self.cfg
5432     # cluster data
5433     data = {
5434       "version": 1,
5435       "cluster_name": self.sstore.GetClusterName(),
5436       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5437       "hypervisor_type": self.sstore.GetHypervisorType(),
5438       # we don't have job IDs
5439       }
5440
5441     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5442
5443     # node data
5444     node_results = {}
5445     node_list = cfg.GetNodeList()
5446     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5447     for nname in node_list:
5448       ninfo = cfg.GetNodeInfo(nname)
5449       if nname not in node_data or not isinstance(node_data[nname], dict):
5450         raise errors.OpExecError("Can't get data for node %s" % nname)
5451       remote_info = node_data[nname]
5452       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5453                    'vg_size', 'vg_free', 'cpu_total']:
5454         if attr not in remote_info:
5455           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5456                                    (nname, attr))
5457         try:
5458           remote_info[attr] = int(remote_info[attr])
5459         except ValueError, err:
5460           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5461                                    " %s" % (nname, attr, str(err)))
5462       # compute memory used by primary instances
5463       i_p_mem = i_p_up_mem = 0
5464       for iinfo in i_list:
5465         if iinfo.primary_node == nname:
5466           i_p_mem += iinfo.memory
5467           if iinfo.status == "up":
5468             i_p_up_mem += iinfo.memory
5469
5470       # compute memory used by instances
5471       pnr = {
5472         "tags": list(ninfo.GetTags()),
5473         "total_memory": remote_info['memory_total'],
5474         "reserved_memory": remote_info['memory_dom0'],
5475         "free_memory": remote_info['memory_free'],
5476         "i_pri_memory": i_p_mem,
5477         "i_pri_up_memory": i_p_up_mem,
5478         "total_disk": remote_info['vg_size'],
5479         "free_disk": remote_info['vg_free'],
5480         "primary_ip": ninfo.primary_ip,
5481         "secondary_ip": ninfo.secondary_ip,
5482         "total_cpus": remote_info['cpu_total'],
5483         }
5484       node_results[nname] = pnr
5485     data["nodes"] = node_results
5486
5487     # instance data
5488     instance_data = {}
5489     for iinfo in i_list:
5490       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5491                   for n in iinfo.nics]
5492       pir = {
5493         "tags": list(iinfo.GetTags()),
5494         "should_run": iinfo.status == "up",
5495         "vcpus": iinfo.vcpus,
5496         "memory": iinfo.memory,
5497         "os": iinfo.os,
5498         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5499         "nics": nic_data,
5500         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5501         "disk_template": iinfo.disk_template,
5502         }
5503       instance_data[iinfo.name] = pir
5504
5505     data["instances"] = instance_data
5506
5507     self.in_data = data
5508
5509   def _AddNewInstance(self):
5510     """Add new instance data to allocator structure.
5511
5512     This in combination with _AllocatorGetClusterData will create the
5513     correct structure needed as input for the allocator.
5514
5515     The checks for the completeness of the opcode must have already been
5516     done.
5517
5518     """
5519     data = self.in_data
5520     if len(self.disks) != 2:
5521       raise errors.OpExecError("Only two-disk configurations supported")
5522
5523     disk_space = _ComputeDiskSize(self.disk_template,
5524                                   self.disks[0]["size"], self.disks[1]["size"])
5525
5526     if self.disk_template in constants.DTS_NET_MIRROR:
5527       self.required_nodes = 2
5528     else:
5529       self.required_nodes = 1
5530     request = {
5531       "type": "allocate",
5532       "name": self.name,
5533       "disk_template": self.disk_template,
5534       "tags": self.tags,
5535       "os": self.os,
5536       "vcpus": self.vcpus,
5537       "memory": self.mem_size,
5538       "disks": self.disks,
5539       "disk_space_total": disk_space,
5540       "nics": self.nics,
5541       "required_nodes": self.required_nodes,
5542       }
5543     data["request"] = request
5544
5545   def _AddRelocateInstance(self):
5546     """Add relocate instance data to allocator structure.
5547
5548     This in combination with _IAllocatorGetClusterData will create the
5549     correct structure needed as input for the allocator.
5550
5551     The checks for the completeness of the opcode must have already been
5552     done.
5553
5554     """
5555     instance = self.cfg.GetInstanceInfo(self.name)
5556     if instance is None:
5557       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5558                                    " IAllocator" % self.name)
5559
5560     if instance.disk_template not in constants.DTS_NET_MIRROR:
5561       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5562
5563     if len(instance.secondary_nodes) != 1:
5564       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5565
5566     self.required_nodes = 1
5567
5568     disk_space = _ComputeDiskSize(instance.disk_template,
5569                                   instance.disks[0].size,
5570                                   instance.disks[1].size)
5571
5572     request = {
5573       "type": "relocate",
5574       "name": self.name,
5575       "disk_space_total": disk_space,
5576       "required_nodes": self.required_nodes,
5577       "relocate_from": self.relocate_from,
5578       }
5579     self.in_data["request"] = request
5580
5581   def _BuildInputData(self):
5582     """Build input data structures.
5583
5584     """
5585     self._ComputeClusterData()
5586
5587     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5588       self._AddNewInstance()
5589     else:
5590       self._AddRelocateInstance()
5591
5592     self.in_text = serializer.Dump(self.in_data)
5593
5594   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5595     """Run an instance allocator and return the results.
5596
5597     """
5598     data = self.in_text
5599
5600     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5601
5602     if not isinstance(result, tuple) or len(result) != 4:
5603       raise errors.OpExecError("Invalid result from master iallocator runner")
5604
5605     rcode, stdout, stderr, fail = result
5606
5607     if rcode == constants.IARUN_NOTFOUND:
5608       raise errors.OpExecError("Can't find allocator '%s'" % name)
5609     elif rcode == constants.IARUN_FAILURE:
5610         raise errors.OpExecError("Instance allocator call failed: %s,"
5611                                  " output: %s" %
5612                                  (fail, stdout+stderr))
5613     self.out_text = stdout
5614     if validate:
5615       self._ValidateResult()
5616
5617   def _ValidateResult(self):
5618     """Process the allocator results.
5619
5620     This will process and if successful save the result in
5621     self.out_data and the other parameters.
5622
5623     """
5624     try:
5625       rdict = serializer.Load(self.out_text)
5626     except Exception, err:
5627       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5628
5629     if not isinstance(rdict, dict):
5630       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5631
5632     for key in "success", "info", "nodes":
5633       if key not in rdict:
5634         raise errors.OpExecError("Can't parse iallocator results:"
5635                                  " missing key '%s'" % key)
5636       setattr(self, key, rdict[key])
5637
5638     if not isinstance(rdict["nodes"], list):
5639       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5640                                " is not a list")
5641     self.out_data = rdict
5642
5643
5644 class LUTestAllocator(NoHooksLU):
5645   """Run allocator tests.
5646
5647   This LU runs the allocator tests
5648
5649   """
5650   _OP_REQP = ["direction", "mode", "name"]
5651
5652   def CheckPrereq(self):
5653     """Check prerequisites.
5654
5655     This checks the opcode parameters depending on the director and mode test.
5656
5657     """
5658     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5659       for attr in ["name", "mem_size", "disks", "disk_template",
5660                    "os", "tags", "nics", "vcpus"]:
5661         if not hasattr(self.op, attr):
5662           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5663                                      attr)
5664       iname = self.cfg.ExpandInstanceName(self.op.name)
5665       if iname is not None:
5666         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5667                                    iname)
5668       if not isinstance(self.op.nics, list):
5669         raise errors.OpPrereqError("Invalid parameter 'nics'")
5670       for row in self.op.nics:
5671         if (not isinstance(row, dict) or
5672             "mac" not in row or
5673             "ip" not in row or
5674             "bridge" not in row):
5675           raise errors.OpPrereqError("Invalid contents of the"
5676                                      " 'nics' parameter")
5677       if not isinstance(self.op.disks, list):
5678         raise errors.OpPrereqError("Invalid parameter 'disks'")
5679       if len(self.op.disks) != 2:
5680         raise errors.OpPrereqError("Only two-disk configurations supported")
5681       for row in self.op.disks:
5682         if (not isinstance(row, dict) or
5683             "size" not in row or
5684             not isinstance(row["size"], int) or
5685             "mode" not in row or
5686             row["mode"] not in ['r', 'w']):
5687           raise errors.OpPrereqError("Invalid contents of the"
5688                                      " 'disks' parameter")
5689     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5690       if not hasattr(self.op, "name"):
5691         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5692       fname = self.cfg.ExpandInstanceName(self.op.name)
5693       if fname is None:
5694         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5695                                    self.op.name)
5696       self.op.name = fname
5697       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5698     else:
5699       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5700                                  self.op.mode)
5701
5702     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5703       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5704         raise errors.OpPrereqError("Missing allocator name")
5705     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5706       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5707                                  self.op.direction)
5708
5709   def Exec(self, feedback_fn):
5710     """Run the allocator test.
5711
5712     """
5713     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5714       ial = IAllocator(self.cfg, self.sstore,
5715                        mode=self.op.mode,
5716                        name=self.op.name,
5717                        mem_size=self.op.mem_size,
5718                        disks=self.op.disks,
5719                        disk_template=self.op.disk_template,
5720                        os=self.op.os,
5721                        tags=self.op.tags,
5722                        nics=self.op.nics,
5723                        vcpus=self.op.vcpus,
5724                        )
5725     else:
5726       ial = IAllocator(self.cfg, self.sstore,
5727                        mode=self.op.mode,
5728                        name=self.op.name,
5729                        relocate_from=list(self.relocate_from),
5730                        )
5731
5732     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5733       result = ial.in_text
5734     else:
5735       ial.Run(self.op.allocator, validate=False)
5736       result = ial.out_text
5737     return result