Add an instance_migratable rpc call
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     No nodes should be returned as an empty list (and not None).
137
138     Note that if the HPATH for a LU class is None, this function will
139     not be called.
140
141     """
142     raise NotImplementedError
143
144   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145     """Notify the LU about the results of its hooks.
146
147     This method is called every time a hooks phase is executed, and notifies
148     the Logical Unit about the hooks' result. The LU can then use it to alter
149     its result based on the hooks.  By default the method does nothing and the
150     previous result is passed back unchanged but any LU can define it if it
151     wants to use the local cluster hook-scripts somehow.
152
153     Args:
154       phase: the hooks phase that has just been run
155       hooks_results: the results of the multi-node hooks rpc call
156       feedback_fn: function to send feedback back to the caller
157       lu_result: the previous result this LU had, or None in the PRE phase.
158
159     """
160     return lu_result
161
162
163 class NoHooksLU(LogicalUnit):
164   """Simple LU which runs no hooks.
165
166   This LU is intended as a parent for other LogicalUnits which will
167   run no hooks, in order to reduce duplicate code.
168
169   """
170   HPATH = None
171   HTYPE = None
172
173
174 def _AddHostToEtcHosts(hostname):
175   """Wrapper around utils.SetEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
180
181
182 def _RemoveHostFromEtcHosts(hostname):
183   """Wrapper around utils.RemoveEtcHostsEntry.
184
185   """
186   hi = utils.HostInfo(name=hostname)
187   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
189
190
191 def _GetWantedNodes(lu, nodes):
192   """Returns list of checked and expanded node names.
193
194   Args:
195     nodes: List of nodes (strings) or None for all
196
197   """
198   if not isinstance(nodes, list):
199     raise errors.OpPrereqError("Invalid argument type 'nodes'")
200
201   if nodes:
202     wanted = []
203
204     for name in nodes:
205       node = lu.cfg.ExpandNodeName(name)
206       if node is None:
207         raise errors.OpPrereqError("No such node name '%s'" % name)
208       wanted.append(node)
209
210   else:
211     wanted = lu.cfg.GetNodeList()
212   return utils.NiceSort(wanted)
213
214
215 def _GetWantedInstances(lu, instances):
216   """Returns list of checked and expanded instance names.
217
218   Args:
219     instances: List of instances (strings) or None for all
220
221   """
222   if not isinstance(instances, list):
223     raise errors.OpPrereqError("Invalid argument type 'instances'")
224
225   if instances:
226     wanted = []
227
228     for name in instances:
229       instance = lu.cfg.ExpandInstanceName(name)
230       if instance is None:
231         raise errors.OpPrereqError("No such instance name '%s'" % name)
232       wanted.append(instance)
233
234   else:
235     wanted = lu.cfg.GetInstanceList()
236   return utils.NiceSort(wanted)
237
238
239 def _CheckOutputFields(static, dynamic, selected):
240   """Checks whether all selected fields are valid.
241
242   Args:
243     static: Static fields
244     dynamic: Dynamic fields
245
246   """
247   static_fields = frozenset(static)
248   dynamic_fields = frozenset(dynamic)
249
250   all_fields = static_fields | dynamic_fields
251
252   if not all_fields.issuperset(selected):
253     raise errors.OpPrereqError("Unknown output fields selected: %s"
254                                % ",".join(frozenset(selected).
255                                           difference(all_fields)))
256
257
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259                           memory, vcpus, nics):
260   """Builds instance related env variables for hooks from single variables.
261
262   Args:
263     secondary_nodes: List of secondary nodes as strings
264   """
265   env = {
266     "OP_TARGET": name,
267     "INSTANCE_NAME": name,
268     "INSTANCE_PRIMARY": primary_node,
269     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270     "INSTANCE_OS_TYPE": os_type,
271     "INSTANCE_STATUS": status,
272     "INSTANCE_MEMORY": memory,
273     "INSTANCE_VCPUS": vcpus,
274   }
275
276   if nics:
277     nic_count = len(nics)
278     for idx, (ip, bridge, mac) in enumerate(nics):
279       if ip is None:
280         ip = ""
281       env["INSTANCE_NIC%d_IP" % idx] = ip
282       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
284   else:
285     nic_count = 0
286
287   env["INSTANCE_NIC_COUNT"] = nic_count
288
289   return env
290
291
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293   """Builds instance related env variables for hooks from an object.
294
295   Args:
296     instance: objects.Instance object of instance
297     override: dict of values to override
298   """
299   args = {
300     'name': instance.name,
301     'primary_node': instance.primary_node,
302     'secondary_nodes': instance.secondary_nodes,
303     'os_type': instance.os,
304     'status': instance.os,
305     'memory': instance.memory,
306     'vcpus': instance.vcpus,
307     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308   }
309   if override:
310     args.update(override)
311   return _BuildInstanceHookEnv(**args)
312
313
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315   """Ensure a node has a correct known_hosts entry.
316
317   Args:
318     fullnode - Fully qualified domain name of host. (str)
319     ip       - IPv4 address of host (str)
320     pubkey   - the public key of the cluster
321
322   """
323   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
325   else:
326     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327
328   inthere = False
329
330   save_lines = []
331   add_lines = []
332   removed = False
333
334   for rawline in f:
335     logger.Debug('read %s' % (repr(rawline),))
336
337     parts = rawline.rstrip('\r\n').split()
338
339     # Ignore unwanted lines
340     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341       fields = parts[0].split(',')
342       key = parts[2]
343
344       haveall = True
345       havesome = False
346       for spec in [ ip, fullnode ]:
347         if spec not in fields:
348           haveall = False
349         if spec in fields:
350           havesome = True
351
352       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353       if haveall and key == pubkey:
354         inthere = True
355         save_lines.append(rawline)
356         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
357         continue
358
359       if havesome and (not haveall or key != pubkey):
360         removed = True
361         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
362         continue
363
364     save_lines.append(rawline)
365
366   if not inthere:
367     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
369
370   if removed:
371     save_lines = save_lines + add_lines
372
373     # Write a new file and replace old.
374     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
375                                    constants.DATA_DIR)
376     newfile = os.fdopen(fd, 'w')
377     try:
378       newfile.write(''.join(save_lines))
379     finally:
380       newfile.close()
381     logger.Debug("Wrote new known_hosts.")
382     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
383
384   elif add_lines:
385     # Simply appending a new line will do the trick.
386     f.seek(0, 2)
387     for add in add_lines:
388       f.write(add)
389
390   f.close()
391
392
393 def _HasValidVG(vglist, vgname):
394   """Checks if the volume group list is valid.
395
396   A non-None return value means there's an error, and the return value
397   is the error message.
398
399   """
400   vgsize = vglist.get(vgname, None)
401   if vgsize is None:
402     return "volume group '%s' missing" % vgname
403   elif vgsize < 20480:
404     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
405             (vgname, vgsize))
406   return None
407
408
409 def _InitSSHSetup(node):
410   """Setup the SSH configuration for the cluster.
411
412
413   This generates a dsa keypair for root, adds the pub key to the
414   permitted hosts and adds the hostkey to its own known hosts.
415
416   Args:
417     node: the name of this host as a fqdn
418
419   """
420   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
421
422   for name in priv_key, pub_key:
423     if os.path.exists(name):
424       utils.CreateBackup(name)
425     utils.RemoveFile(name)
426
427   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
428                          "-f", priv_key,
429                          "-q", "-N", ""])
430   if result.failed:
431     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
432                              result.output)
433
434   f = open(pub_key, 'r')
435   try:
436     utils.AddAuthorizedKey(auth_keys, f.read(8192))
437   finally:
438     f.close()
439
440
441 def _InitGanetiServerSetup(ss):
442   """Setup the necessary configuration for the initial node daemon.
443
444   This creates the nodepass file containing the shared password for
445   the cluster and also generates the SSL certificate.
446
447   """
448   # Create pseudo random password
449   randpass = sha.new(os.urandom(64)).hexdigest()
450   # and write it into sstore
451   ss.SetKey(ss.SS_NODED_PASS, randpass)
452
453   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454                          "-days", str(365*5), "-nodes", "-x509",
455                          "-keyout", constants.SSL_CERT_FILE,
456                          "-out", constants.SSL_CERT_FILE, "-batch"])
457   if result.failed:
458     raise errors.OpExecError("could not generate server ssl cert, command"
459                              " %s had exitcode %s and error message %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462   os.chmod(constants.SSL_CERT_FILE, 0400)
463
464   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
465
466   if result.failed:
467     raise errors.OpExecError("Could not start the node daemon, command %s"
468                              " had exitcode %s and error %s" %
469                              (result.cmd, result.exit_code, result.output))
470
471
472 def _CheckInstanceBridgesExist(instance):
473   """Check that the brigdes needed by an instance exist.
474
475   """
476   # check bridges existance
477   brlist = [nic.bridge for nic in instance.nics]
478   if not rpc.call_bridges_exist(instance.primary_node, brlist):
479     raise errors.OpPrereqError("one or more target bridges %s does not"
480                                " exist on destination node '%s'" %
481                                (brlist, instance.primary_node))
482
483
484 class LUInitCluster(LogicalUnit):
485   """Initialise the cluster.
486
487   """
488   HPATH = "cluster-init"
489   HTYPE = constants.HTYPE_CLUSTER
490   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491               "def_bridge", "master_netdev"]
492   REQ_CLUSTER = False
493
494   def BuildHooksEnv(self):
495     """Build hooks env.
496
497     Notes: Since we don't require a cluster, we must manually add
498     ourselves in the post-run node list.
499
500     """
501     env = {"OP_TARGET": self.op.cluster_name}
502     return env, [], [self.hostname.name]
503
504   def CheckPrereq(self):
505     """Verify that the passed name is a valid one.
506
507     """
508     if config.ConfigWriter.IsCluster():
509       raise errors.OpPrereqError("Cluster is already initialised")
510
511     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512       if not os.path.exists(constants.VNC_PASSWORD_FILE):
513         raise errors.OpPrereqError("Please prepare the cluster VNC"
514                                    "password file %s" %
515                                    constants.VNC_PASSWORD_FILE)
516
517     self.hostname = hostname = utils.HostInfo()
518
519     if hostname.ip.startswith("127."):
520       raise errors.OpPrereqError("This host's IP resolves to the private"
521                                  " range (%s). Please fix DNS or %s." %
522                                  (hostname.ip, constants.ETC_HOSTS))
523
524     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525                          source=constants.LOCALHOST_IP_ADDRESS):
526       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527                                  " to %s,\nbut this ip address does not"
528                                  " belong to this host."
529                                  " Aborting." % hostname.ip)
530
531     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
532
533     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
534                      timeout=5):
535       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
536
537     secondary_ip = getattr(self.op, "secondary_ip", None)
538     if secondary_ip and not utils.IsValidIP(secondary_ip):
539       raise errors.OpPrereqError("Invalid secondary ip given")
540     if (secondary_ip and
541         secondary_ip != hostname.ip and
542         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543                            source=constants.LOCALHOST_IP_ADDRESS))):
544       raise errors.OpPrereqError("You gave %s as secondary IP,"
545                                  " but it does not belong to this host." %
546                                  secondary_ip)
547     self.secondary_ip = secondary_ip
548
549     # checks presence of the volume group given
550     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
551
552     if vgstatus:
553       raise errors.OpPrereqError("Error: %s" % vgstatus)
554
555     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
556                     self.op.mac_prefix):
557       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
558                                  self.op.mac_prefix)
559
560     if self.op.hypervisor_type not in constants.HYPER_TYPES:
561       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562                                  self.op.hypervisor_type)
563
564     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
565     if result.failed:
566       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567                                  (self.op.master_netdev,
568                                   result.output.strip()))
569
570     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572       raise errors.OpPrereqError("Init.d script '%s' missing or not"
573                                  " executable." % constants.NODE_INITD_SCRIPT)
574
575   def Exec(self, feedback_fn):
576     """Initialize the cluster.
577
578     """
579     clustername = self.clustername
580     hostname = self.hostname
581
582     # set up the simple store
583     self.sstore = ss = ssconf.SimpleStore()
584     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
589
590     # set up the inter-node password and certificate
591     _InitGanetiServerSetup(ss)
592
593     # start the master ip
594     rpc.call_node_start_master(hostname.name)
595
596     # set up ssh config and /etc/hosts
597     f = open(constants.SSH_HOST_RSA_PUB, 'r')
598     try:
599       sshline = f.read()
600     finally:
601       f.close()
602     sshkey = sshline.split(" ")[1]
603
604     _AddHostToEtcHosts(hostname.name)
605
606     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
607
608     _InitSSHSetup(hostname.name)
609
610     # init of cluster config file
611     self.cfg = cfgw = config.ConfigWriter()
612     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613                     sshkey, self.op.mac_prefix,
614                     self.op.vg_name, self.op.def_bridge)
615
616
617 class LUDestroyCluster(NoHooksLU):
618   """Logical unit for destroying the cluster.
619
620   """
621   _OP_REQP = []
622
623   def CheckPrereq(self):
624     """Check prerequisites.
625
626     This checks whether the cluster is empty.
627
628     Any errors are signalled by raising errors.OpPrereqError.
629
630     """
631     master = self.sstore.GetMasterNode()
632
633     nodelist = self.cfg.GetNodeList()
634     if len(nodelist) != 1 or nodelist[0] != master:
635       raise errors.OpPrereqError("There are still %d node(s) in"
636                                  " this cluster." % (len(nodelist) - 1))
637     instancelist = self.cfg.GetInstanceList()
638     if instancelist:
639       raise errors.OpPrereqError("There are still %d instance(s) in"
640                                  " this cluster." % len(instancelist))
641
642   def Exec(self, feedback_fn):
643     """Destroys the cluster.
644
645     """
646     master = self.sstore.GetMasterNode()
647     if not rpc.call_node_stop_master(master):
648       raise errors.OpExecError("Could not disable the master role")
649     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650     utils.CreateBackup(priv_key)
651     utils.CreateBackup(pub_key)
652     rpc.call_node_leave_cluster(master)
653
654
655 class LUVerifyCluster(LogicalUnit):
656   """Verifies the cluster status.
657
658   """
659   HPATH = "cluster-verify"
660   HTYPE = constants.HTYPE_CLUSTER
661   _OP_REQP = ["skip_checks"]
662
663   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664                   remote_version, feedback_fn):
665     """Run multiple tests against a node.
666
667     Test list:
668       - compares ganeti version
669       - checks vg existance and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     Args:
674       node: name of the node to check
675       file_list: required list of files
676       local_cksum: dictionary of local files and their checksums
677
678     """
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     if not remote_version:
682       feedback_fn("  - ERROR: connection to %s failed" % (node))
683       return True
684
685     if local_version != remote_version:
686       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
687                       (local_version, node, remote_version))
688       return True
689
690     # checks vg existance and size > 20G
691
692     bad = False
693     if not vglist:
694       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
695                       (node,))
696       bad = True
697     else:
698       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
699       if vgstatus:
700         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
701         bad = True
702
703     # checks config file checksum
704     # checks ssh to any
705
706     if 'filelist' not in node_result:
707       bad = True
708       feedback_fn("  - ERROR: node hasn't returned file checksum data")
709     else:
710       remote_cksum = node_result['filelist']
711       for file_name in file_list:
712         if file_name not in remote_cksum:
713           bad = True
714           feedback_fn("  - ERROR: file '%s' missing" % file_name)
715         elif remote_cksum[file_name] != local_cksum[file_name]:
716           bad = True
717           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
718
719     if 'nodelist' not in node_result:
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
722     else:
723       if node_result['nodelist']:
724         bad = True
725         for node in node_result['nodelist']:
726           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
727                           (node, node_result['nodelist'][node]))
728     if 'node-net-test' not in node_result:
729       bad = True
730       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
731     else:
732       if node_result['node-net-test']:
733         bad = True
734         nlist = utils.NiceSort(node_result['node-net-test'].keys())
735         for node in nlist:
736           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
737                           (node, node_result['node-net-test'][node]))
738
739     hyp_result = node_result.get('hypervisor', None)
740     if hyp_result is not None:
741       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
742     return bad
743
744   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745                       node_instance, feedback_fn):
746     """Verify an instance.
747
748     This function checks to see if the required block devices are
749     available on the instance's node.
750
751     """
752     bad = False
753
754     node_current = instanceconfig.primary_node
755
756     node_vol_should = {}
757     instanceconfig.MapLVsByNode(node_vol_should)
758
759     for node in node_vol_should:
760       for volume in node_vol_should[node]:
761         if node not in node_vol_is or volume not in node_vol_is[node]:
762           feedback_fn("  - ERROR: volume %s missing on node %s" %
763                           (volume, node))
764           bad = True
765
766     if not instanceconfig.status == 'down':
767       if (node_current not in node_instance or
768           not instance in node_instance[node_current]):
769         feedback_fn("  - ERROR: instance %s not running on node %s" %
770                         (instance, node_current))
771         bad = True
772
773     for node in node_instance:
774       if (not node == node_current):
775         if instance in node_instance[node]:
776           feedback_fn("  - ERROR: instance %s should not run on node %s" %
777                           (instance, node))
778           bad = True
779
780     return bad
781
782   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783     """Verify if there are any unknown volumes in the cluster.
784
785     The .os, .swap and backup volumes are ignored. All other volumes are
786     reported as unknown.
787
788     """
789     bad = False
790
791     for node in node_vol_is:
792       for volume in node_vol_is[node]:
793         if node not in node_vol_should or volume not in node_vol_should[node]:
794           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
795                       (volume, node))
796           bad = True
797     return bad
798
799   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800     """Verify the list of running instances.
801
802     This checks what instances are running but unknown to the cluster.
803
804     """
805     bad = False
806     for node in node_instance:
807       for runninginstance in node_instance[node]:
808         if runninginstance not in instancelist:
809           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
810                           (runninginstance, node))
811           bad = True
812     return bad
813
814   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815     """Verify N+1 Memory Resilience.
816
817     Check that if one single node dies we can still start all the instances it
818     was primary for.
819
820     """
821     bad = False
822
823     for node, nodeinfo in node_info.iteritems():
824       # This code checks that every node which is now listed as secondary has
825       # enough memory to host all instances it is supposed to should a single
826       # other node in the cluster fail.
827       # FIXME: not ready for failover to an arbitrary node
828       # FIXME: does not support file-backed instances
829       # WARNING: we currently take into account down instances as well as up
830       # ones, considering that even if they're down someone might want to start
831       # them even in the event of a node failure.
832       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
833         needed_mem = 0
834         for instance in instances:
835           needed_mem += instance_cfg[instance].memory
836         if nodeinfo['mfree'] < needed_mem:
837           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
838                       " failovers should node %s fail" % (node, prinode))
839           bad = True
840     return bad
841
842   def CheckPrereq(self):
843     """Check prerequisites.
844
845     Transform the list of checks we're going to skip into a set and check that
846     all its members are valid.
847
848     """
849     self.skip_set = frozenset(self.op.skip_checks)
850     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851       raise errors.OpPrereqError("Invalid checks to be skipped specified")
852
853   def BuildHooksEnv(self):
854     """Build hooks env.
855
856     Cluster-Verify hooks just rone in the post phase and their failure makes
857     the output be logged in the verify output and the verification to fail.
858
859     """
860     all_nodes = self.cfg.GetNodeList()
861     tags = self.cfg.GetClusterInfo().GetTags()
862     # TODO: populate the environment with useful information for verify hooks
863     env = {
864       "CLUSTER_TAGS": " ".join(tags),
865       }
866     return env, [], all_nodes
867
868   def Exec(self, feedback_fn):
869     """Verify integrity of cluster, performing various test on nodes.
870
871     """
872     bad = False
873     feedback_fn("* Verifying global settings")
874     for msg in self.cfg.VerifyConfig():
875       feedback_fn("  - ERROR: %s" % msg)
876
877     vg_name = self.cfg.GetVGName()
878     nodelist = utils.NiceSort(self.cfg.GetNodeList())
879     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881     i_non_redundant = [] # Non redundant instances
882     node_volume = {}
883     node_instance = {}
884     node_info = {}
885     instance_cfg = {}
886
887     # FIXME: verify OS list
888     # do local checksums
889     file_names = list(self.sstore.GetFileList())
890     file_names.append(constants.SSL_CERT_FILE)
891     file_names.append(constants.CLUSTER_CONF_FILE)
892     local_checksums = utils.FingerprintFiles(file_names)
893
894     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896     all_instanceinfo = rpc.call_instance_list(nodelist)
897     all_vglist = rpc.call_vg_list(nodelist)
898     node_verify_param = {
899       'filelist': file_names,
900       'nodelist': nodelist,
901       'hypervisor': None,
902       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903                         for node in nodeinfo]
904       }
905     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906     all_rversion = rpc.call_version(nodelist)
907     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
908
909     incomplete_nodeinfo = False
910
911     for node in nodelist:
912       feedback_fn("* Verifying node %s" % node)
913       result = self._VerifyNode(node, file_names, local_checksums,
914                                 all_vglist[node], all_nvinfo[node],
915                                 all_rversion[node], feedback_fn)
916       bad = bad or result
917
918       # node_volume
919       volumeinfo = all_volumeinfo[node]
920
921       if isinstance(volumeinfo, basestring):
922         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
923                     (node, volumeinfo[-400:].encode('string_escape')))
924         bad = True
925         node_volume[node] = {}
926       elif not isinstance(volumeinfo, dict):
927         feedback_fn("  - ERROR: connection to %s failed" % (node,))
928         bad = True
929         incomplete_nodeinfo = True
930         continue
931       else:
932         node_volume[node] = volumeinfo
933
934       # node_instance
935       nodeinstance = all_instanceinfo[node]
936       if type(nodeinstance) != list:
937         feedback_fn("  - ERROR: connection to %s failed" % (node,))
938         bad = True
939         incomplete_nodeinfo = True
940         continue
941
942       node_instance[node] = nodeinstance
943
944       # node_info
945       nodeinfo = all_ninfo[node]
946       if not isinstance(nodeinfo, dict):
947         feedback_fn("  - ERROR: connection to %s failed" % (node,))
948         bad = True
949         incomplete_nodeinfo = True
950         continue
951
952       try:
953         node_info[node] = {
954           "mfree": int(nodeinfo['memory_free']),
955           "dfree": int(nodeinfo['vg_free']),
956           "pinst": [],
957           "sinst": [],
958           # dictionary holding all instances this node is secondary for,
959           # grouped by their primary node. Each key is a cluster node, and each
960           # value is a list of instances which have the key as primary and the
961           # current node as secondary.  this is handy to calculate N+1 memory
962           # availability if you can only failover from a primary to its
963           # secondary.
964           "sinst-by-pnode": {},
965         }
966       except (ValueError, TypeError):
967         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
968         bad = True
969         incomplete_nodeinfo = True
970         continue
971
972     node_vol_should = {}
973
974     for instance in instancelist:
975       feedback_fn("* Verifying instance %s" % instance)
976       inst_config = self.cfg.GetInstanceInfo(instance)
977       result =  self._VerifyInstance(instance, inst_config, node_volume,
978                                      node_instance, feedback_fn)
979       bad = bad or result
980
981       inst_config.MapLVsByNode(node_vol_should)
982
983       instance_cfg[instance] = inst_config
984
985       pnode = inst_config.primary_node
986       if pnode in node_info:
987         node_info[pnode]['pinst'].append(instance)
988       else:
989         feedback_fn("  - ERROR: instance %s, connection to primary node"
990                     " %s failed" % (instance, pnode))
991         bad = True
992
993       # If the instance is non-redundant we cannot survive losing its primary
994       # node, so we are not N+1 compliant. On the other hand we have no disk
995       # templates with more than one secondary so that situation is not well
996       # supported either.
997       # FIXME: does not support file-backed instances
998       if len(inst_config.secondary_nodes) == 0:
999         i_non_redundant.append(instance)
1000       elif len(inst_config.secondary_nodes) > 1:
1001         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1002                     % instance)
1003
1004       for snode in inst_config.secondary_nodes:
1005         if snode in node_info:
1006           node_info[snode]['sinst'].append(instance)
1007           if pnode not in node_info[snode]['sinst-by-pnode']:
1008             node_info[snode]['sinst-by-pnode'][pnode] = []
1009           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1010         else:
1011           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1012                       " %s failed" % (instance, snode))
1013
1014     feedback_fn("* Verifying orphan volumes")
1015     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1016                                        feedback_fn)
1017     bad = bad or result
1018
1019     feedback_fn("* Verifying remaining instances")
1020     result = self._VerifyOrphanInstances(instancelist, node_instance,
1021                                          feedback_fn)
1022     bad = bad or result
1023
1024     if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025         not incomplete_nodeinfo):
1026       feedback_fn("* Verifying N+1 Memory redundancy")
1027       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1028       bad = bad or result
1029
1030     feedback_fn("* Other Notes")
1031     if i_non_redundant:
1032       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1033                   % len(i_non_redundant))
1034
1035     return int(bad)
1036
1037   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038     """Analize the post-hooks' result, handle it, and send some
1039     nicely-formatted feedback back to the user.
1040
1041     Args:
1042       phase: the hooks phase that has just been run
1043       hooks_results: the results of the multi-node hooks rpc call
1044       feedback_fn: function to send feedback back to the caller
1045       lu_result: previous Exec result
1046
1047     """
1048     # We only really run POST phase hooks, and are only interested in their results
1049     if phase == constants.HOOKS_PHASE_POST:
1050       # Used to change hooks' output to proper indentation
1051       indent_re = re.compile('^', re.M)
1052       feedback_fn("* Hooks Results")
1053       if not hooks_results:
1054         feedback_fn("  - ERROR: general communication failure")
1055         lu_result = 1
1056       else:
1057         for node_name in hooks_results:
1058           show_node_header = True
1059           res = hooks_results[node_name]
1060           if res is False or not isinstance(res, list):
1061             feedback_fn("    Communication failure")
1062             lu_result = 1
1063             continue
1064           for script, hkr, output in res:
1065             if hkr == constants.HKR_FAIL:
1066               # The node header is only shown once, if there are
1067               # failing hooks on that node
1068               if show_node_header:
1069                 feedback_fn("  Node %s:" % node_name)
1070                 show_node_header = False
1071               feedback_fn("    ERROR: Script %s failed, output:" % script)
1072               output = indent_re.sub('      ', output)
1073               feedback_fn("%s" % output)
1074               lu_result = 1
1075
1076       return lu_result
1077
1078
1079 class LUVerifyDisks(NoHooksLU):
1080   """Verifies the cluster disks status.
1081
1082   """
1083   _OP_REQP = []
1084
1085   def CheckPrereq(self):
1086     """Check prerequisites.
1087
1088     This has no prerequisites.
1089
1090     """
1091     pass
1092
1093   def Exec(self, feedback_fn):
1094     """Verify integrity of cluster disks.
1095
1096     """
1097     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1098
1099     vg_name = self.cfg.GetVGName()
1100     nodes = utils.NiceSort(self.cfg.GetNodeList())
1101     instances = [self.cfg.GetInstanceInfo(name)
1102                  for name in self.cfg.GetInstanceList()]
1103
1104     nv_dict = {}
1105     for inst in instances:
1106       inst_lvs = {}
1107       if (inst.status != "up" or
1108           inst.disk_template not in constants.DTS_NET_MIRROR):
1109         continue
1110       inst.MapLVsByNode(inst_lvs)
1111       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112       for node, vol_list in inst_lvs.iteritems():
1113         for vol in vol_list:
1114           nv_dict[(node, vol)] = inst
1115
1116     if not nv_dict:
1117       return result
1118
1119     node_lvs = rpc.call_volume_list(nodes, vg_name)
1120
1121     to_act = set()
1122     for node in nodes:
1123       # node_volume
1124       lvs = node_lvs[node]
1125
1126       if isinstance(lvs, basestring):
1127         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128         res_nlvm[node] = lvs
1129       elif not isinstance(lvs, dict):
1130         logger.Info("connection to node %s failed or invalid data returned" %
1131                     (node,))
1132         res_nodes.append(node)
1133         continue
1134
1135       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136         inst = nv_dict.pop((node, lv_name), None)
1137         if (not lv_online and inst is not None
1138             and inst.name not in res_instances):
1139           res_instances.append(inst.name)
1140
1141     # any leftover items in nv_dict are missing LVs, let's arrange the
1142     # data better
1143     for key, inst in nv_dict.iteritems():
1144       if inst.name not in res_missing:
1145         res_missing[inst.name] = []
1146       res_missing[inst.name].append(key)
1147
1148     return result
1149
1150
1151 class LURenameCluster(LogicalUnit):
1152   """Rename the cluster.
1153
1154   """
1155   HPATH = "cluster-rename"
1156   HTYPE = constants.HTYPE_CLUSTER
1157   _OP_REQP = ["name"]
1158
1159   def BuildHooksEnv(self):
1160     """Build hooks env.
1161
1162     """
1163     env = {
1164       "OP_TARGET": self.sstore.GetClusterName(),
1165       "NEW_NAME": self.op.name,
1166       }
1167     mn = self.sstore.GetMasterNode()
1168     return env, [mn], [mn]
1169
1170   def CheckPrereq(self):
1171     """Verify that the passed name is a valid one.
1172
1173     """
1174     hostname = utils.HostInfo(self.op.name)
1175
1176     new_name = hostname.name
1177     self.ip = new_ip = hostname.ip
1178     old_name = self.sstore.GetClusterName()
1179     old_ip = self.sstore.GetMasterIP()
1180     if new_name == old_name and new_ip == old_ip:
1181       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182                                  " cluster has changed")
1183     if new_ip != old_ip:
1184       result = utils.RunCmd(["fping", "-q", new_ip])
1185       if not result.failed:
1186         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1187                                    " reachable on the network. Aborting." %
1188                                    new_ip)
1189
1190     self.op.name = new_name
1191
1192   def Exec(self, feedback_fn):
1193     """Rename the cluster.
1194
1195     """
1196     clustername = self.op.name
1197     ip = self.ip
1198     ss = self.sstore
1199
1200     # shutdown the master IP
1201     master = ss.GetMasterNode()
1202     if not rpc.call_node_stop_master(master):
1203       raise errors.OpExecError("Could not disable the master role")
1204
1205     try:
1206       # modify the sstore
1207       ss.SetKey(ss.SS_MASTER_IP, ip)
1208       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1209
1210       # Distribute updated ss config to all nodes
1211       myself = self.cfg.GetNodeInfo(master)
1212       dist_nodes = self.cfg.GetNodeList()
1213       if myself.name in dist_nodes:
1214         dist_nodes.remove(myself.name)
1215
1216       logger.Debug("Copying updated ssconf data to all nodes")
1217       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1218         fname = ss.KeyToFilename(keyname)
1219         result = rpc.call_upload_file(dist_nodes, fname)
1220         for to_node in dist_nodes:
1221           if not result[to_node]:
1222             logger.Error("copy of file %s to node %s failed" %
1223                          (fname, to_node))
1224     finally:
1225       if not rpc.call_node_start_master(master):
1226         logger.Error("Could not re-enable the master role on the master,"
1227                      " please restart manually.")
1228
1229
1230 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1231   """Sleep and poll for an instance's disk to sync.
1232
1233   """
1234   if not instance.disks:
1235     return True
1236
1237   if not oneshot:
1238     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1239
1240   node = instance.primary_node
1241
1242   for dev in instance.disks:
1243     cfgw.SetDiskID(dev, node)
1244
1245   retries = 0
1246   while True:
1247     max_time = 0
1248     done = True
1249     cumul_degraded = False
1250     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1251     if not rstats:
1252       proc.LogWarning("Can't get any data from node %s" % node)
1253       retries += 1
1254       if retries >= 10:
1255         raise errors.RemoteError("Can't contact node %s for mirror data,"
1256                                  " aborting." % node)
1257       time.sleep(6)
1258       continue
1259     retries = 0
1260     for i in range(len(rstats)):
1261       mstat = rstats[i]
1262       if mstat is None:
1263         proc.LogWarning("Can't compute data for node %s/%s" %
1264                         (node, instance.disks[i].iv_name))
1265         continue
1266       # we ignore the ldisk parameter
1267       perc_done, est_time, is_degraded, _ = mstat
1268       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1269       if perc_done is not None:
1270         done = False
1271         if est_time is not None:
1272           rem_time = "%d estimated seconds remaining" % est_time
1273           max_time = est_time
1274         else:
1275           rem_time = "no time estimate"
1276         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1277                      (instance.disks[i].iv_name, perc_done, rem_time))
1278     if done or oneshot:
1279       break
1280
1281     if unlock:
1282       utils.Unlock('cmd')
1283     try:
1284       time.sleep(min(60, max_time))
1285     finally:
1286       if unlock:
1287         utils.Lock('cmd')
1288
1289   if done:
1290     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1291   return not cumul_degraded
1292
1293
1294 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1295   """Check that mirrors are not degraded.
1296
1297   The ldisk parameter, if True, will change the test from the
1298   is_degraded attribute (which represents overall non-ok status for
1299   the device(s)) to the ldisk (representing the local storage status).
1300
1301   """
1302   cfgw.SetDiskID(dev, node)
1303   if ldisk:
1304     idx = 6
1305   else:
1306     idx = 5
1307
1308   result = True
1309   if on_primary or dev.AssembleOnSecondary():
1310     rstats = rpc.call_blockdev_find(node, dev)
1311     if not rstats:
1312       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1313       result = False
1314     else:
1315       result = result and (not rstats[idx])
1316   if dev.children:
1317     for child in dev.children:
1318       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1319
1320   return result
1321
1322
1323 class LUDiagnoseOS(NoHooksLU):
1324   """Logical unit for OS diagnose/query.
1325
1326   """
1327   _OP_REQP = ["output_fields", "names"]
1328
1329   def CheckPrereq(self):
1330     """Check prerequisites.
1331
1332     This always succeeds, since this is a pure query LU.
1333
1334     """
1335     if self.op.names:
1336       raise errors.OpPrereqError("Selective OS query not supported")
1337
1338     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1339     _CheckOutputFields(static=[],
1340                        dynamic=self.dynamic_fields,
1341                        selected=self.op.output_fields)
1342
1343   @staticmethod
1344   def _DiagnoseByOS(node_list, rlist):
1345     """Remaps a per-node return list into an a per-os per-node dictionary
1346
1347       Args:
1348         node_list: a list with the names of all nodes
1349         rlist: a map with node names as keys and OS objects as values
1350
1351       Returns:
1352         map: a map with osnames as keys and as value another map, with
1353              nodes as
1354              keys and list of OS objects as values
1355              e.g. {"debian-etch": {"node1": [<object>,...],
1356                                    "node2": [<object>,]}
1357                   }
1358
1359     """
1360     all_os = {}
1361     for node_name, nr in rlist.iteritems():
1362       if not nr:
1363         continue
1364       for os in nr:
1365         if os.name not in all_os:
1366           # build a list of nodes for this os containing empty lists
1367           # for each node in node_list
1368           all_os[os.name] = {}
1369           for nname in node_list:
1370             all_os[os.name][nname] = []
1371         all_os[os.name][node_name].append(os)
1372     return all_os
1373
1374   def Exec(self, feedback_fn):
1375     """Compute the list of OSes.
1376
1377     """
1378     node_list = self.cfg.GetNodeList()
1379     node_data = rpc.call_os_diagnose(node_list)
1380     if node_data == False:
1381       raise errors.OpExecError("Can't gather the list of OSes")
1382     pol = self._DiagnoseByOS(node_list, node_data)
1383     output = []
1384     for os_name, os_data in pol.iteritems():
1385       row = []
1386       for field in self.op.output_fields:
1387         if field == "name":
1388           val = os_name
1389         elif field == "valid":
1390           val = utils.all([osl and osl[0] for osl in os_data.values()])
1391         elif field == "node_status":
1392           val = {}
1393           for node_name, nos_list in os_data.iteritems():
1394             val[node_name] = [(v.status, v.path) for v in nos_list]
1395         else:
1396           raise errors.ParameterError(field)
1397         row.append(val)
1398       output.append(row)
1399
1400     return output
1401
1402
1403 class LURemoveNode(LogicalUnit):
1404   """Logical unit for removing a node.
1405
1406   """
1407   HPATH = "node-remove"
1408   HTYPE = constants.HTYPE_NODE
1409   _OP_REQP = ["node_name"]
1410
1411   def BuildHooksEnv(self):
1412     """Build hooks env.
1413
1414     This doesn't run on the target node in the pre phase as a failed
1415     node would not allows itself to run.
1416
1417     """
1418     env = {
1419       "OP_TARGET": self.op.node_name,
1420       "NODE_NAME": self.op.node_name,
1421       }
1422     all_nodes = self.cfg.GetNodeList()
1423     all_nodes.remove(self.op.node_name)
1424     return env, all_nodes, all_nodes
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks:
1430      - the node exists in the configuration
1431      - it does not have primary or secondary instances
1432      - it's not the master
1433
1434     Any errors are signalled by raising errors.OpPrereqError.
1435
1436     """
1437     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438     if node is None:
1439       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440
1441     instance_list = self.cfg.GetInstanceList()
1442
1443     masternode = self.sstore.GetMasterNode()
1444     if node.name == masternode:
1445       raise errors.OpPrereqError("Node is the master node,"
1446                                  " you need to failover first.")
1447
1448     for instance_name in instance_list:
1449       instance = self.cfg.GetInstanceInfo(instance_name)
1450       if node.name == instance.primary_node:
1451         raise errors.OpPrereqError("Instance %s still running on the node,"
1452                                    " please remove first." % instance_name)
1453       if node.name in instance.secondary_nodes:
1454         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1455                                    " please remove first." % instance_name)
1456     self.op.node_name = node.name
1457     self.node = node
1458
1459   def Exec(self, feedback_fn):
1460     """Removes the node from the cluster.
1461
1462     """
1463     node = self.node
1464     logger.Info("stopping the node daemon and removing configs from node %s" %
1465                 node.name)
1466
1467     rpc.call_node_leave_cluster(node.name)
1468
1469     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1470
1471     logger.Info("Removing node %s from config" % node.name)
1472
1473     self.cfg.RemoveNode(node.name)
1474
1475     _RemoveHostFromEtcHosts(node.name)
1476
1477
1478 class LUQueryNodes(NoHooksLU):
1479   """Logical unit for querying nodes.
1480
1481   """
1482   _OP_REQP = ["output_fields", "names"]
1483
1484   def CheckPrereq(self):
1485     """Check prerequisites.
1486
1487     This checks that the fields required are valid output fields.
1488
1489     """
1490     self.dynamic_fields = frozenset([
1491       "dtotal", "dfree",
1492       "mtotal", "mnode", "mfree",
1493       "bootid",
1494       "ctotal",
1495       ])
1496
1497     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1498                                "pinst_list", "sinst_list",
1499                                "pip", "sip", "tags"],
1500                        dynamic=self.dynamic_fields,
1501                        selected=self.op.output_fields)
1502
1503     self.wanted = _GetWantedNodes(self, self.op.names)
1504
1505   def Exec(self, feedback_fn):
1506     """Computes the list of nodes and their attributes.
1507
1508     """
1509     nodenames = self.wanted
1510     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1511
1512     # begin data gathering
1513
1514     if self.dynamic_fields.intersection(self.op.output_fields):
1515       live_data = {}
1516       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1517       for name in nodenames:
1518         nodeinfo = node_data.get(name, None)
1519         if nodeinfo:
1520           live_data[name] = {
1521             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1522             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1523             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1524             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1525             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1526             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1527             "bootid": nodeinfo['bootid'],
1528             }
1529         else:
1530           live_data[name] = {}
1531     else:
1532       live_data = dict.fromkeys(nodenames, {})
1533
1534     node_to_primary = dict([(name, set()) for name in nodenames])
1535     node_to_secondary = dict([(name, set()) for name in nodenames])
1536
1537     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1538                              "sinst_cnt", "sinst_list"))
1539     if inst_fields & frozenset(self.op.output_fields):
1540       instancelist = self.cfg.GetInstanceList()
1541
1542       for instance_name in instancelist:
1543         inst = self.cfg.GetInstanceInfo(instance_name)
1544         if inst.primary_node in node_to_primary:
1545           node_to_primary[inst.primary_node].add(inst.name)
1546         for secnode in inst.secondary_nodes:
1547           if secnode in node_to_secondary:
1548             node_to_secondary[secnode].add(inst.name)
1549
1550     # end data gathering
1551
1552     output = []
1553     for node in nodelist:
1554       node_output = []
1555       for field in self.op.output_fields:
1556         if field == "name":
1557           val = node.name
1558         elif field == "pinst_list":
1559           val = list(node_to_primary[node.name])
1560         elif field == "sinst_list":
1561           val = list(node_to_secondary[node.name])
1562         elif field == "pinst_cnt":
1563           val = len(node_to_primary[node.name])
1564         elif field == "sinst_cnt":
1565           val = len(node_to_secondary[node.name])
1566         elif field == "pip":
1567           val = node.primary_ip
1568         elif field == "sip":
1569           val = node.secondary_ip
1570         elif field == "tags":
1571           val = list(node.GetTags())
1572         elif field in self.dynamic_fields:
1573           val = live_data[node.name].get(field, None)
1574         else:
1575           raise errors.ParameterError(field)
1576         node_output.append(val)
1577       output.append(node_output)
1578
1579     return output
1580
1581
1582 class LUQueryNodeVolumes(NoHooksLU):
1583   """Logical unit for getting volumes on node(s).
1584
1585   """
1586   _OP_REQP = ["nodes", "output_fields"]
1587
1588   def CheckPrereq(self):
1589     """Check prerequisites.
1590
1591     This checks that the fields required are valid output fields.
1592
1593     """
1594     self.nodes = _GetWantedNodes(self, self.op.nodes)
1595
1596     _CheckOutputFields(static=["node"],
1597                        dynamic=["phys", "vg", "name", "size", "instance"],
1598                        selected=self.op.output_fields)
1599
1600
1601   def Exec(self, feedback_fn):
1602     """Computes the list of nodes and their attributes.
1603
1604     """
1605     nodenames = self.nodes
1606     volumes = rpc.call_node_volumes(nodenames)
1607
1608     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1609              in self.cfg.GetInstanceList()]
1610
1611     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1612
1613     output = []
1614     for node in nodenames:
1615       if node not in volumes or not volumes[node]:
1616         continue
1617
1618       node_vols = volumes[node][:]
1619       node_vols.sort(key=lambda vol: vol['dev'])
1620
1621       for vol in node_vols:
1622         node_output = []
1623         for field in self.op.output_fields:
1624           if field == "node":
1625             val = node
1626           elif field == "phys":
1627             val = vol['dev']
1628           elif field == "vg":
1629             val = vol['vg']
1630           elif field == "name":
1631             val = vol['name']
1632           elif field == "size":
1633             val = int(float(vol['size']))
1634           elif field == "instance":
1635             for inst in ilist:
1636               if node not in lv_by_node[inst]:
1637                 continue
1638               if vol['name'] in lv_by_node[inst][node]:
1639                 val = inst.name
1640                 break
1641             else:
1642               val = '-'
1643           else:
1644             raise errors.ParameterError(field)
1645           node_output.append(str(val))
1646
1647         output.append(node_output)
1648
1649     return output
1650
1651
1652 class LUAddNode(LogicalUnit):
1653   """Logical unit for adding node to the cluster.
1654
1655   """
1656   HPATH = "node-add"
1657   HTYPE = constants.HTYPE_NODE
1658   _OP_REQP = ["node_name"]
1659
1660   def BuildHooksEnv(self):
1661     """Build hooks env.
1662
1663     This will run on all nodes before, and on all nodes + the new node after.
1664
1665     """
1666     env = {
1667       "OP_TARGET": self.op.node_name,
1668       "NODE_NAME": self.op.node_name,
1669       "NODE_PIP": self.op.primary_ip,
1670       "NODE_SIP": self.op.secondary_ip,
1671       }
1672     nodes_0 = self.cfg.GetNodeList()
1673     nodes_1 = nodes_0 + [self.op.node_name, ]
1674     return env, nodes_0, nodes_1
1675
1676   def CheckPrereq(self):
1677     """Check prerequisites.
1678
1679     This checks:
1680      - the new node is not already in the config
1681      - it is resolvable
1682      - its parameters (single/dual homed) matches the cluster
1683
1684     Any errors are signalled by raising errors.OpPrereqError.
1685
1686     """
1687     node_name = self.op.node_name
1688     cfg = self.cfg
1689
1690     dns_data = utils.HostInfo(node_name)
1691
1692     node = dns_data.name
1693     primary_ip = self.op.primary_ip = dns_data.ip
1694     secondary_ip = getattr(self.op, "secondary_ip", None)
1695     if secondary_ip is None:
1696       secondary_ip = primary_ip
1697     if not utils.IsValidIP(secondary_ip):
1698       raise errors.OpPrereqError("Invalid secondary IP given")
1699     self.op.secondary_ip = secondary_ip
1700
1701     node_list = cfg.GetNodeList()
1702     if not self.op.readd and node in node_list:
1703       raise errors.OpPrereqError("Node %s is already in the configuration" %
1704                                  node)
1705     elif self.op.readd and node not in node_list:
1706       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1707
1708     for existing_node_name in node_list:
1709       existing_node = cfg.GetNodeInfo(existing_node_name)
1710
1711       if self.op.readd and node == existing_node_name:
1712         if (existing_node.primary_ip != primary_ip or
1713             existing_node.secondary_ip != secondary_ip):
1714           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1715                                      " address configuration as before")
1716         continue
1717
1718       if (existing_node.primary_ip == primary_ip or
1719           existing_node.secondary_ip == primary_ip or
1720           existing_node.primary_ip == secondary_ip or
1721           existing_node.secondary_ip == secondary_ip):
1722         raise errors.OpPrereqError("New node ip address(es) conflict with"
1723                                    " existing node %s" % existing_node.name)
1724
1725     # check that the type of the node (single versus dual homed) is the
1726     # same as for the master
1727     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1728     master_singlehomed = myself.secondary_ip == myself.primary_ip
1729     newbie_singlehomed = secondary_ip == primary_ip
1730     if master_singlehomed != newbie_singlehomed:
1731       if master_singlehomed:
1732         raise errors.OpPrereqError("The master has no private ip but the"
1733                                    " new node has one")
1734       else:
1735         raise errors.OpPrereqError("The master has a private ip but the"
1736                                    " new node doesn't have one")
1737
1738     # checks reachablity
1739     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1740       raise errors.OpPrereqError("Node not reachable by ping")
1741
1742     if not newbie_singlehomed:
1743       # check reachability from my secondary ip to newbie's secondary ip
1744       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1745                            source=myself.secondary_ip):
1746         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1747                                    " based ping to noded port")
1748
1749     self.new_node = objects.Node(name=node,
1750                                  primary_ip=primary_ip,
1751                                  secondary_ip=secondary_ip)
1752
1753     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1754       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1755         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1756                                    constants.VNC_PASSWORD_FILE)
1757
1758   def Exec(self, feedback_fn):
1759     """Adds the new node to the cluster.
1760
1761     """
1762     new_node = self.new_node
1763     node = new_node.name
1764
1765     # set up inter-node password and certificate and restarts the node daemon
1766     gntpass = self.sstore.GetNodeDaemonPassword()
1767     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1768       raise errors.OpExecError("ganeti password corruption detected")
1769     f = open(constants.SSL_CERT_FILE)
1770     try:
1771       gntpem = f.read(8192)
1772     finally:
1773       f.close()
1774     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1775     # so we use this to detect an invalid certificate; as long as the
1776     # cert doesn't contain this, the here-document will be correctly
1777     # parsed by the shell sequence below
1778     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1779       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1780     if not gntpem.endswith("\n"):
1781       raise errors.OpExecError("PEM must end with newline")
1782     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1783
1784     # and then connect with ssh to set password and start ganeti-noded
1785     # note that all the below variables are sanitized at this point,
1786     # either by being constants or by the checks above
1787     ss = self.sstore
1788     mycommand = ("umask 077 && "
1789                  "echo '%s' > '%s' && "
1790                  "cat > '%s' << '!EOF.' && \n"
1791                  "%s!EOF.\n%s restart" %
1792                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1793                   constants.SSL_CERT_FILE, gntpem,
1794                   constants.NODE_INITD_SCRIPT))
1795
1796     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1797     if result.failed:
1798       raise errors.OpExecError("Remote command on node %s, error: %s,"
1799                                " output: %s" %
1800                                (node, result.fail_reason, result.output))
1801
1802     # check connectivity
1803     time.sleep(4)
1804
1805     result = rpc.call_version([node])[node]
1806     if result:
1807       if constants.PROTOCOL_VERSION == result:
1808         logger.Info("communication to node %s fine, sw version %s match" %
1809                     (node, result))
1810       else:
1811         raise errors.OpExecError("Version mismatch master version %s,"
1812                                  " node version %s" %
1813                                  (constants.PROTOCOL_VERSION, result))
1814     else:
1815       raise errors.OpExecError("Cannot get version from the new node")
1816
1817     # setup ssh on node
1818     logger.Info("copy ssh key to node %s" % node)
1819     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1820     keyarray = []
1821     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1822                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1823                 priv_key, pub_key]
1824
1825     for i in keyfiles:
1826       f = open(i, 'r')
1827       try:
1828         keyarray.append(f.read())
1829       finally:
1830         f.close()
1831
1832     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1833                                keyarray[3], keyarray[4], keyarray[5])
1834
1835     if not result:
1836       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1837
1838     # Add node to our /etc/hosts, and add key to known_hosts
1839     _AddHostToEtcHosts(new_node.name)
1840
1841     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1842                       self.cfg.GetHostKey())
1843
1844     if new_node.secondary_ip != new_node.primary_ip:
1845       if not rpc.call_node_tcp_ping(new_node.name,
1846                                     constants.LOCALHOST_IP_ADDRESS,
1847                                     new_node.secondary_ip,
1848                                     constants.DEFAULT_NODED_PORT,
1849                                     10, False):
1850         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1851                                  " you gave (%s). Please fix and re-run this"
1852                                  " command." % new_node.secondary_ip)
1853
1854     success, msg = ssh.VerifyNodeHostname(node)
1855     if not success:
1856       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1857                                " than the one the resolver gives: %s."
1858                                " Please fix and re-run this command." %
1859                                (node, msg))
1860
1861     # Distribute updated /etc/hosts and known_hosts to all nodes,
1862     # including the node just added
1863     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1864     dist_nodes = self.cfg.GetNodeList()
1865     if not self.op.readd:
1866       dist_nodes.append(node)
1867     if myself.name in dist_nodes:
1868       dist_nodes.remove(myself.name)
1869
1870     logger.Debug("Copying hosts and known_hosts to all nodes")
1871     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872       result = rpc.call_upload_file(dist_nodes, fname)
1873       for to_node in dist_nodes:
1874         if not result[to_node]:
1875           logger.Error("copy of file %s to node %s failed" %
1876                        (fname, to_node))
1877
1878     to_copy = ss.GetFileList()
1879     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1880       to_copy.append(constants.VNC_PASSWORD_FILE)
1881     for fname in to_copy:
1882       if not ssh.CopyFileToNode(node, fname):
1883         logger.Error("could not copy file %s to node %s" % (fname, node))
1884
1885     if not self.op.readd:
1886       logger.Info("adding node %s to cluster.conf" % node)
1887       self.cfg.AddNode(new_node)
1888
1889
1890 class LUMasterFailover(LogicalUnit):
1891   """Failover the master node to the current node.
1892
1893   This is a special LU in that it must run on a non-master node.
1894
1895   """
1896   HPATH = "master-failover"
1897   HTYPE = constants.HTYPE_CLUSTER
1898   REQ_MASTER = False
1899   _OP_REQP = []
1900
1901   def BuildHooksEnv(self):
1902     """Build hooks env.
1903
1904     This will run on the new master only in the pre phase, and on all
1905     the nodes in the post phase.
1906
1907     """
1908     env = {
1909       "OP_TARGET": self.new_master,
1910       "NEW_MASTER": self.new_master,
1911       "OLD_MASTER": self.old_master,
1912       }
1913     return env, [self.new_master], self.cfg.GetNodeList()
1914
1915   def CheckPrereq(self):
1916     """Check prerequisites.
1917
1918     This checks that we are not already the master.
1919
1920     """
1921     self.new_master = utils.HostInfo().name
1922     self.old_master = self.sstore.GetMasterNode()
1923
1924     if self.old_master == self.new_master:
1925       raise errors.OpPrereqError("This commands must be run on the node"
1926                                  " where you want the new master to be."
1927                                  " %s is already the master" %
1928                                  self.old_master)
1929
1930   def Exec(self, feedback_fn):
1931     """Failover the master node.
1932
1933     This command, when run on a non-master node, will cause the current
1934     master to cease being master, and the non-master to become new
1935     master.
1936
1937     """
1938     #TODO: do not rely on gethostname returning the FQDN
1939     logger.Info("setting master to %s, old master: %s" %
1940                 (self.new_master, self.old_master))
1941
1942     if not rpc.call_node_stop_master(self.old_master):
1943       logger.Error("could disable the master role on the old master"
1944                    " %s, please disable manually" % self.old_master)
1945
1946     ss = self.sstore
1947     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1948     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1949                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1950       logger.Error("could not distribute the new simple store master file"
1951                    " to the other nodes, please check.")
1952
1953     if not rpc.call_node_start_master(self.new_master):
1954       logger.Error("could not start the master role on the new master"
1955                    " %s, please check" % self.new_master)
1956       feedback_fn("Error in activating the master IP on the new master,"
1957                   " please fix manually.")
1958
1959
1960
1961 class LUQueryClusterInfo(NoHooksLU):
1962   """Query cluster configuration.
1963
1964   """
1965   _OP_REQP = []
1966   REQ_MASTER = False
1967
1968   def CheckPrereq(self):
1969     """No prerequsites needed for this LU.
1970
1971     """
1972     pass
1973
1974   def Exec(self, feedback_fn):
1975     """Return cluster config.
1976
1977     """
1978     result = {
1979       "name": self.sstore.GetClusterName(),
1980       "software_version": constants.RELEASE_VERSION,
1981       "protocol_version": constants.PROTOCOL_VERSION,
1982       "config_version": constants.CONFIG_VERSION,
1983       "os_api_version": constants.OS_API_VERSION,
1984       "export_version": constants.EXPORT_VERSION,
1985       "master": self.sstore.GetMasterNode(),
1986       "architecture": (platform.architecture()[0], platform.machine()),
1987       "hypervisor_type": self.sstore.GetHypervisorType(),
1988       }
1989
1990     return result
1991
1992
1993 class LUClusterCopyFile(NoHooksLU):
1994   """Copy file to cluster.
1995
1996   """
1997   _OP_REQP = ["nodes", "filename"]
1998
1999   def CheckPrereq(self):
2000     """Check prerequisites.
2001
2002     It should check that the named file exists and that the given list
2003     of nodes is valid.
2004
2005     """
2006     if not os.path.exists(self.op.filename):
2007       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2008
2009     self.nodes = _GetWantedNodes(self, self.op.nodes)
2010
2011   def Exec(self, feedback_fn):
2012     """Copy a file from master to some nodes.
2013
2014     Args:
2015       opts - class with options as members
2016       args - list containing a single element, the file name
2017     Opts used:
2018       nodes - list containing the name of target nodes; if empty, all nodes
2019
2020     """
2021     filename = self.op.filename
2022
2023     myname = utils.HostInfo().name
2024
2025     for node in self.nodes:
2026       if node == myname:
2027         continue
2028       if not ssh.CopyFileToNode(node, filename):
2029         logger.Error("Copy of file %s to node %s failed" % (filename, node))
2030
2031
2032 class LUDumpClusterConfig(NoHooksLU):
2033   """Return a text-representation of the cluster-config.
2034
2035   """
2036   _OP_REQP = []
2037
2038   def CheckPrereq(self):
2039     """No prerequisites.
2040
2041     """
2042     pass
2043
2044   def Exec(self, feedback_fn):
2045     """Dump a representation of the cluster config to the standard output.
2046
2047     """
2048     return self.cfg.DumpConfig()
2049
2050
2051 class LURunClusterCommand(NoHooksLU):
2052   """Run a command on some nodes.
2053
2054   """
2055   _OP_REQP = ["command", "nodes"]
2056
2057   def CheckPrereq(self):
2058     """Check prerequisites.
2059
2060     It checks that the given list of nodes is valid.
2061
2062     """
2063     self.nodes = _GetWantedNodes(self, self.op.nodes)
2064
2065   def Exec(self, feedback_fn):
2066     """Run a command on some nodes.
2067
2068     """
2069     # put the master at the end of the nodes list
2070     master_node = self.sstore.GetMasterNode()
2071     if master_node in self.nodes:
2072       self.nodes.remove(master_node)
2073       self.nodes.append(master_node)
2074
2075     data = []
2076     for node in self.nodes:
2077       result = ssh.SSHCall(node, "root", self.op.command)
2078       data.append((node, result.output, result.exit_code))
2079
2080     return data
2081
2082
2083 class LUActivateInstanceDisks(NoHooksLU):
2084   """Bring up an instance's disks.
2085
2086   """
2087   _OP_REQP = ["instance_name"]
2088
2089   def CheckPrereq(self):
2090     """Check prerequisites.
2091
2092     This checks that the instance is in the cluster.
2093
2094     """
2095     instance = self.cfg.GetInstanceInfo(
2096       self.cfg.ExpandInstanceName(self.op.instance_name))
2097     if instance is None:
2098       raise errors.OpPrereqError("Instance '%s' not known" %
2099                                  self.op.instance_name)
2100     self.instance = instance
2101
2102
2103   def Exec(self, feedback_fn):
2104     """Activate the disks.
2105
2106     """
2107     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2108     if not disks_ok:
2109       raise errors.OpExecError("Cannot activate block devices")
2110
2111     return disks_info
2112
2113
2114 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2115   """Prepare the block devices for an instance.
2116
2117   This sets up the block devices on all nodes.
2118
2119   Args:
2120     instance: a ganeti.objects.Instance object
2121     ignore_secondaries: if true, errors on secondary nodes won't result
2122                         in an error return from the function
2123
2124   Returns:
2125     false if the operation failed
2126     list of (host, instance_visible_name, node_visible_name) if the operation
2127          suceeded with the mapping from node devices to instance devices
2128   """
2129   device_info = []
2130   disks_ok = True
2131   iname = instance.name
2132   # With the two passes mechanism we try to reduce the window of
2133   # opportunity for the race condition of switching DRBD to primary
2134   # before handshaking occured, but we do not eliminate it
2135
2136   # The proper fix would be to wait (with some limits) until the
2137   # connection has been made and drbd transitions from WFConnection
2138   # into any other network-connected state (Connected, SyncTarget,
2139   # SyncSource, etc.)
2140
2141   # 1st pass, assemble on all nodes in secondary mode
2142   for inst_disk in instance.disks:
2143     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2144       cfg.SetDiskID(node_disk, node)
2145       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2146       if not result:
2147         logger.Error("could not prepare block device %s on node %s"
2148                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2149         if not ignore_secondaries:
2150           disks_ok = False
2151
2152   # FIXME: race condition on drbd migration to primary
2153
2154   # 2nd pass, do only the primary node
2155   for inst_disk in instance.disks:
2156     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2157       if node != instance.primary_node:
2158         continue
2159       cfg.SetDiskID(node_disk, node)
2160       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2161       if not result:
2162         logger.Error("could not prepare block device %s on node %s"
2163                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2164         disks_ok = False
2165     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2166
2167   # leave the disks configured for the primary node
2168   # this is a workaround that would be fixed better by
2169   # improving the logical/physical id handling
2170   for disk in instance.disks:
2171     cfg.SetDiskID(disk, instance.primary_node)
2172
2173   return disks_ok, device_info
2174
2175
2176 def _StartInstanceDisks(cfg, instance, force):
2177   """Start the disks of an instance.
2178
2179   """
2180   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2181                                            ignore_secondaries=force)
2182   if not disks_ok:
2183     _ShutdownInstanceDisks(instance, cfg)
2184     if force is not None and not force:
2185       logger.Error("If the message above refers to a secondary node,"
2186                    " you can retry the operation using '--force'.")
2187     raise errors.OpExecError("Disk consistency error")
2188
2189
2190 class LUDeactivateInstanceDisks(NoHooksLU):
2191   """Shutdown an instance's disks.
2192
2193   """
2194   _OP_REQP = ["instance_name"]
2195
2196   def CheckPrereq(self):
2197     """Check prerequisites.
2198
2199     This checks that the instance is in the cluster.
2200
2201     """
2202     instance = self.cfg.GetInstanceInfo(
2203       self.cfg.ExpandInstanceName(self.op.instance_name))
2204     if instance is None:
2205       raise errors.OpPrereqError("Instance '%s' not known" %
2206                                  self.op.instance_name)
2207     self.instance = instance
2208
2209   def Exec(self, feedback_fn):
2210     """Deactivate the disks
2211
2212     """
2213     instance = self.instance
2214     ins_l = rpc.call_instance_list([instance.primary_node])
2215     ins_l = ins_l[instance.primary_node]
2216     if not type(ins_l) is list:
2217       raise errors.OpExecError("Can't contact node '%s'" %
2218                                instance.primary_node)
2219
2220     if self.instance.name in ins_l:
2221       raise errors.OpExecError("Instance is running, can't shutdown"
2222                                " block devices.")
2223
2224     _ShutdownInstanceDisks(instance, self.cfg)
2225
2226
2227 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2228   """Shutdown block devices of an instance.
2229
2230   This does the shutdown on all nodes of the instance.
2231
2232   If the ignore_primary is false, errors on the primary node are
2233   ignored.
2234
2235   """
2236   result = True
2237   for disk in instance.disks:
2238     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2239       cfg.SetDiskID(top_disk, node)
2240       if not rpc.call_blockdev_shutdown(node, top_disk):
2241         logger.Error("could not shutdown block device %s on node %s" %
2242                      (disk.iv_name, node))
2243         if not ignore_primary or node != instance.primary_node:
2244           result = False
2245   return result
2246
2247
2248 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2249   """Checks if a node has enough free memory.
2250
2251   This function check if a given node has the needed amount of free
2252   memory. In case the node has less memory or we cannot get the
2253   information from the node, this function raise an OpPrereqError
2254   exception.
2255
2256   Args:
2257     - cfg: a ConfigWriter instance
2258     - node: the node name
2259     - reason: string to use in the error message
2260     - requested: the amount of memory in MiB
2261
2262   """
2263   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2264   if not nodeinfo or not isinstance(nodeinfo, dict):
2265     raise errors.OpPrereqError("Could not contact node %s for resource"
2266                              " information" % (node,))
2267
2268   free_mem = nodeinfo[node].get('memory_free')
2269   if not isinstance(free_mem, int):
2270     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271                              " was '%s'" % (node, free_mem))
2272   if requested > free_mem:
2273     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274                              " needed %s MiB, available %s MiB" %
2275                              (node, reason, requested, free_mem))
2276
2277
2278 class LUStartupInstance(LogicalUnit):
2279   """Starts an instance.
2280
2281   """
2282   HPATH = "instance-start"
2283   HTYPE = constants.HTYPE_INSTANCE
2284   _OP_REQP = ["instance_name", "force"]
2285
2286   def BuildHooksEnv(self):
2287     """Build hooks env.
2288
2289     This runs on master, primary and secondary nodes of the instance.
2290
2291     """
2292     env = {
2293       "FORCE": self.op.force,
2294       }
2295     env.update(_BuildInstanceHookEnvByObject(self.instance))
2296     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297           list(self.instance.secondary_nodes))
2298     return env, nl, nl
2299
2300   def CheckPrereq(self):
2301     """Check prerequisites.
2302
2303     This checks that the instance is in the cluster.
2304
2305     """
2306     instance = self.cfg.GetInstanceInfo(
2307       self.cfg.ExpandInstanceName(self.op.instance_name))
2308     if instance is None:
2309       raise errors.OpPrereqError("Instance '%s' not known" %
2310                                  self.op.instance_name)
2311
2312     # check bridges existance
2313     _CheckInstanceBridgesExist(instance)
2314
2315     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316                          "starting instance %s" % instance.name,
2317                          instance.memory)
2318
2319     self.instance = instance
2320     self.op.instance_name = instance.name
2321
2322   def Exec(self, feedback_fn):
2323     """Start the instance.
2324
2325     """
2326     instance = self.instance
2327     force = self.op.force
2328     extra_args = getattr(self.op, "extra_args", "")
2329
2330     self.cfg.MarkInstanceUp(instance.name)
2331
2332     node_current = instance.primary_node
2333
2334     _StartInstanceDisks(self.cfg, instance, force)
2335
2336     if not rpc.call_instance_start(node_current, instance, extra_args):
2337       _ShutdownInstanceDisks(instance, self.cfg)
2338       raise errors.OpExecError("Could not start instance")
2339
2340
2341 class LURebootInstance(LogicalUnit):
2342   """Reboot an instance.
2343
2344   """
2345   HPATH = "instance-reboot"
2346   HTYPE = constants.HTYPE_INSTANCE
2347   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2348
2349   def BuildHooksEnv(self):
2350     """Build hooks env.
2351
2352     This runs on master, primary and secondary nodes of the instance.
2353
2354     """
2355     env = {
2356       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2357       }
2358     env.update(_BuildInstanceHookEnvByObject(self.instance))
2359     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360           list(self.instance.secondary_nodes))
2361     return env, nl, nl
2362
2363   def CheckPrereq(self):
2364     """Check prerequisites.
2365
2366     This checks that the instance is in the cluster.
2367
2368     """
2369     instance = self.cfg.GetInstanceInfo(
2370       self.cfg.ExpandInstanceName(self.op.instance_name))
2371     if instance is None:
2372       raise errors.OpPrereqError("Instance '%s' not known" %
2373                                  self.op.instance_name)
2374
2375     # check bridges existance
2376     _CheckInstanceBridgesExist(instance)
2377
2378     self.instance = instance
2379     self.op.instance_name = instance.name
2380
2381   def Exec(self, feedback_fn):
2382     """Reboot the instance.
2383
2384     """
2385     instance = self.instance
2386     ignore_secondaries = self.op.ignore_secondaries
2387     reboot_type = self.op.reboot_type
2388     extra_args = getattr(self.op, "extra_args", "")
2389
2390     node_current = instance.primary_node
2391
2392     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393                            constants.INSTANCE_REBOOT_HARD,
2394                            constants.INSTANCE_REBOOT_FULL]:
2395       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396                                   (constants.INSTANCE_REBOOT_SOFT,
2397                                    constants.INSTANCE_REBOOT_HARD,
2398                                    constants.INSTANCE_REBOOT_FULL))
2399
2400     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401                        constants.INSTANCE_REBOOT_HARD]:
2402       if not rpc.call_instance_reboot(node_current, instance,
2403                                       reboot_type, extra_args):
2404         raise errors.OpExecError("Could not reboot instance")
2405     else:
2406       if not rpc.call_instance_shutdown(node_current, instance):
2407         raise errors.OpExecError("could not shutdown instance for full reboot")
2408       _ShutdownInstanceDisks(instance, self.cfg)
2409       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410       if not rpc.call_instance_start(node_current, instance, extra_args):
2411         _ShutdownInstanceDisks(instance, self.cfg)
2412         raise errors.OpExecError("Could not start instance for full reboot")
2413
2414     self.cfg.MarkInstanceUp(instance.name)
2415
2416
2417 class LUShutdownInstance(LogicalUnit):
2418   """Shutdown an instance.
2419
2420   """
2421   HPATH = "instance-stop"
2422   HTYPE = constants.HTYPE_INSTANCE
2423   _OP_REQP = ["instance_name"]
2424
2425   def BuildHooksEnv(self):
2426     """Build hooks env.
2427
2428     This runs on master, primary and secondary nodes of the instance.
2429
2430     """
2431     env = _BuildInstanceHookEnvByObject(self.instance)
2432     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433           list(self.instance.secondary_nodes))
2434     return env, nl, nl
2435
2436   def CheckPrereq(self):
2437     """Check prerequisites.
2438
2439     This checks that the instance is in the cluster.
2440
2441     """
2442     instance = self.cfg.GetInstanceInfo(
2443       self.cfg.ExpandInstanceName(self.op.instance_name))
2444     if instance is None:
2445       raise errors.OpPrereqError("Instance '%s' not known" %
2446                                  self.op.instance_name)
2447     self.instance = instance
2448
2449   def Exec(self, feedback_fn):
2450     """Shutdown the instance.
2451
2452     """
2453     instance = self.instance
2454     node_current = instance.primary_node
2455     self.cfg.MarkInstanceDown(instance.name)
2456     if not rpc.call_instance_shutdown(node_current, instance):
2457       logger.Error("could not shutdown instance")
2458
2459     _ShutdownInstanceDisks(instance, self.cfg)
2460
2461
2462 class LUReinstallInstance(LogicalUnit):
2463   """Reinstall an instance.
2464
2465   """
2466   HPATH = "instance-reinstall"
2467   HTYPE = constants.HTYPE_INSTANCE
2468   _OP_REQP = ["instance_name"]
2469
2470   def BuildHooksEnv(self):
2471     """Build hooks env.
2472
2473     This runs on master, primary and secondary nodes of the instance.
2474
2475     """
2476     env = _BuildInstanceHookEnvByObject(self.instance)
2477     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478           list(self.instance.secondary_nodes))
2479     return env, nl, nl
2480
2481   def CheckPrereq(self):
2482     """Check prerequisites.
2483
2484     This checks that the instance is in the cluster and is not running.
2485
2486     """
2487     instance = self.cfg.GetInstanceInfo(
2488       self.cfg.ExpandInstanceName(self.op.instance_name))
2489     if instance is None:
2490       raise errors.OpPrereqError("Instance '%s' not known" %
2491                                  self.op.instance_name)
2492     if instance.disk_template == constants.DT_DISKLESS:
2493       raise errors.OpPrereqError("Instance '%s' has no disks" %
2494                                  self.op.instance_name)
2495     if instance.status != "down":
2496       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497                                  self.op.instance_name)
2498     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2499     if remote_info:
2500       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501                                  (self.op.instance_name,
2502                                   instance.primary_node))
2503
2504     self.op.os_type = getattr(self.op, "os_type", None)
2505     if self.op.os_type is not None:
2506       # OS verification
2507       pnode = self.cfg.GetNodeInfo(
2508         self.cfg.ExpandNodeName(instance.primary_node))
2509       if pnode is None:
2510         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2511                                    self.op.pnode)
2512       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2513       if not os_obj:
2514         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515                                    " primary node"  % self.op.os_type)
2516
2517     self.instance = instance
2518
2519   def Exec(self, feedback_fn):
2520     """Reinstall the instance.
2521
2522     """
2523     inst = self.instance
2524
2525     if self.op.os_type is not None:
2526       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527       inst.os = self.op.os_type
2528       self.cfg.AddInstance(inst)
2529
2530     _StartInstanceDisks(self.cfg, inst, None)
2531     try:
2532       feedback_fn("Running the instance OS create scripts...")
2533       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534         raise errors.OpExecError("Could not install OS for instance %s"
2535                                  " on node %s" %
2536                                  (inst.name, inst.primary_node))
2537     finally:
2538       _ShutdownInstanceDisks(inst, self.cfg)
2539
2540
2541 class LURenameInstance(LogicalUnit):
2542   """Rename an instance.
2543
2544   """
2545   HPATH = "instance-rename"
2546   HTYPE = constants.HTYPE_INSTANCE
2547   _OP_REQP = ["instance_name", "new_name"]
2548
2549   def BuildHooksEnv(self):
2550     """Build hooks env.
2551
2552     This runs on master, primary and secondary nodes of the instance.
2553
2554     """
2555     env = _BuildInstanceHookEnvByObject(self.instance)
2556     env["INSTANCE_NEW_NAME"] = self.op.new_name
2557     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558           list(self.instance.secondary_nodes))
2559     return env, nl, nl
2560
2561   def CheckPrereq(self):
2562     """Check prerequisites.
2563
2564     This checks that the instance is in the cluster and is not running.
2565
2566     """
2567     instance = self.cfg.GetInstanceInfo(
2568       self.cfg.ExpandInstanceName(self.op.instance_name))
2569     if instance is None:
2570       raise errors.OpPrereqError("Instance '%s' not known" %
2571                                  self.op.instance_name)
2572     if instance.status != "down":
2573       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574                                  self.op.instance_name)
2575     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2576     if remote_info:
2577       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578                                  (self.op.instance_name,
2579                                   instance.primary_node))
2580     self.instance = instance
2581
2582     # new name verification
2583     name_info = utils.HostInfo(self.op.new_name)
2584
2585     self.op.new_name = new_name = name_info.name
2586     instance_list = self.cfg.GetInstanceList()
2587     if new_name in instance_list:
2588       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2589                                  new_name)
2590
2591     if not getattr(self.op, "ignore_ip", False):
2592       command = ["fping", "-q", name_info.ip]
2593       result = utils.RunCmd(command)
2594       if not result.failed:
2595         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2596                                    (name_info.ip, new_name))
2597
2598
2599   def Exec(self, feedback_fn):
2600     """Reinstall the instance.
2601
2602     """
2603     inst = self.instance
2604     old_name = inst.name
2605
2606     self.cfg.RenameInstance(inst.name, self.op.new_name)
2607
2608     # re-read the instance from the configuration after rename
2609     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2610
2611     _StartInstanceDisks(self.cfg, inst, None)
2612     try:
2613       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2614                                           "sda", "sdb"):
2615         msg = ("Could run OS rename script for instance %s on node %s (but the"
2616                " instance has been renamed in Ganeti)" %
2617                (inst.name, inst.primary_node))
2618         logger.Error(msg)
2619     finally:
2620       _ShutdownInstanceDisks(inst, self.cfg)
2621
2622
2623 class LURemoveInstance(LogicalUnit):
2624   """Remove an instance.
2625
2626   """
2627   HPATH = "instance-remove"
2628   HTYPE = constants.HTYPE_INSTANCE
2629   _OP_REQP = ["instance_name", "ignore_failures"]
2630
2631   def BuildHooksEnv(self):
2632     """Build hooks env.
2633
2634     This runs on master, primary and secondary nodes of the instance.
2635
2636     """
2637     env = _BuildInstanceHookEnvByObject(self.instance)
2638     nl = [self.sstore.GetMasterNode()]
2639     return env, nl, nl
2640
2641   def CheckPrereq(self):
2642     """Check prerequisites.
2643
2644     This checks that the instance is in the cluster.
2645
2646     """
2647     instance = self.cfg.GetInstanceInfo(
2648       self.cfg.ExpandInstanceName(self.op.instance_name))
2649     if instance is None:
2650       raise errors.OpPrereqError("Instance '%s' not known" %
2651                                  self.op.instance_name)
2652     self.instance = instance
2653
2654   def Exec(self, feedback_fn):
2655     """Remove the instance.
2656
2657     """
2658     instance = self.instance
2659     logger.Info("shutting down instance %s on node %s" %
2660                 (instance.name, instance.primary_node))
2661
2662     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2663       if self.op.ignore_failures:
2664         feedback_fn("Warning: can't shutdown instance")
2665       else:
2666         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2667                                  (instance.name, instance.primary_node))
2668
2669     logger.Info("removing block devices for instance %s" % instance.name)
2670
2671     if not _RemoveDisks(instance, self.cfg):
2672       if self.op.ignore_failures:
2673         feedback_fn("Warning: can't remove instance's disks")
2674       else:
2675         raise errors.OpExecError("Can't remove instance's disks")
2676
2677     logger.Info("removing instance %s out of cluster config" % instance.name)
2678
2679     self.cfg.RemoveInstance(instance.name)
2680
2681
2682 class LUQueryInstances(NoHooksLU):
2683   """Logical unit for querying instances.
2684
2685   """
2686   _OP_REQP = ["output_fields", "names"]
2687
2688   def CheckPrereq(self):
2689     """Check prerequisites.
2690
2691     This checks that the fields required are valid output fields.
2692
2693     """
2694     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2695     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2696                                "admin_state", "admin_ram",
2697                                "disk_template", "ip", "mac", "bridge",
2698                                "sda_size", "sdb_size", "vcpus", "tags"],
2699                        dynamic=self.dynamic_fields,
2700                        selected=self.op.output_fields)
2701
2702     self.wanted = _GetWantedInstances(self, self.op.names)
2703
2704   def Exec(self, feedback_fn):
2705     """Computes the list of nodes and their attributes.
2706
2707     """
2708     instance_names = self.wanted
2709     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2710                      in instance_names]
2711
2712     # begin data gathering
2713
2714     nodes = frozenset([inst.primary_node for inst in instance_list])
2715
2716     bad_nodes = []
2717     if self.dynamic_fields.intersection(self.op.output_fields):
2718       live_data = {}
2719       node_data = rpc.call_all_instances_info(nodes)
2720       for name in nodes:
2721         result = node_data[name]
2722         if result:
2723           live_data.update(result)
2724         elif result == False:
2725           bad_nodes.append(name)
2726         # else no instance is alive
2727     else:
2728       live_data = dict([(name, {}) for name in instance_names])
2729
2730     # end data gathering
2731
2732     output = []
2733     for instance in instance_list:
2734       iout = []
2735       for field in self.op.output_fields:
2736         if field == "name":
2737           val = instance.name
2738         elif field == "os":
2739           val = instance.os
2740         elif field == "pnode":
2741           val = instance.primary_node
2742         elif field == "snodes":
2743           val = list(instance.secondary_nodes)
2744         elif field == "admin_state":
2745           val = (instance.status != "down")
2746         elif field == "oper_state":
2747           if instance.primary_node in bad_nodes:
2748             val = None
2749           else:
2750             val = bool(live_data.get(instance.name))
2751         elif field == "status":
2752           if instance.primary_node in bad_nodes:
2753             val = "ERROR_nodedown"
2754           else:
2755             running = bool(live_data.get(instance.name))
2756             if running:
2757               if instance.status != "down":
2758                 val = "running"
2759               else:
2760                 val = "ERROR_up"
2761             else:
2762               if instance.status != "down":
2763                 val = "ERROR_down"
2764               else:
2765                 val = "ADMIN_down"
2766         elif field == "admin_ram":
2767           val = instance.memory
2768         elif field == "oper_ram":
2769           if instance.primary_node in bad_nodes:
2770             val = None
2771           elif instance.name in live_data:
2772             val = live_data[instance.name].get("memory", "?")
2773           else:
2774             val = "-"
2775         elif field == "disk_template":
2776           val = instance.disk_template
2777         elif field == "ip":
2778           val = instance.nics[0].ip
2779         elif field == "bridge":
2780           val = instance.nics[0].bridge
2781         elif field == "mac":
2782           val = instance.nics[0].mac
2783         elif field == "sda_size" or field == "sdb_size":
2784           disk = instance.FindDisk(field[:3])
2785           if disk is None:
2786             val = None
2787           else:
2788             val = disk.size
2789         elif field == "vcpus":
2790           val = instance.vcpus
2791         elif field == "tags":
2792           val = list(instance.GetTags())
2793         else:
2794           raise errors.ParameterError(field)
2795         iout.append(val)
2796       output.append(iout)
2797
2798     return output
2799
2800
2801 class LUFailoverInstance(LogicalUnit):
2802   """Failover an instance.
2803
2804   """
2805   HPATH = "instance-failover"
2806   HTYPE = constants.HTYPE_INSTANCE
2807   _OP_REQP = ["instance_name", "ignore_consistency"]
2808
2809   def BuildHooksEnv(self):
2810     """Build hooks env.
2811
2812     This runs on master, primary and secondary nodes of the instance.
2813
2814     """
2815     env = {
2816       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2817       }
2818     env.update(_BuildInstanceHookEnvByObject(self.instance))
2819     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2820     return env, nl, nl
2821
2822   def CheckPrereq(self):
2823     """Check prerequisites.
2824
2825     This checks that the instance is in the cluster.
2826
2827     """
2828     instance = self.cfg.GetInstanceInfo(
2829       self.cfg.ExpandInstanceName(self.op.instance_name))
2830     if instance is None:
2831       raise errors.OpPrereqError("Instance '%s' not known" %
2832                                  self.op.instance_name)
2833
2834     if instance.disk_template not in constants.DTS_NET_MIRROR:
2835       raise errors.OpPrereqError("Instance's disk layout is not"
2836                                  " network mirrored, cannot failover.")
2837
2838     secondary_nodes = instance.secondary_nodes
2839     if not secondary_nodes:
2840       raise errors.ProgrammerError("no secondary node but using "
2841                                    "DT_REMOTE_RAID1 template")
2842
2843     target_node = secondary_nodes[0]
2844     # check memory requirements on the secondary node
2845     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2846                          instance.name, instance.memory)
2847
2848     # check bridge existance
2849     brlist = [nic.bridge for nic in instance.nics]
2850     if not rpc.call_bridges_exist(target_node, brlist):
2851       raise errors.OpPrereqError("One or more target bridges %s does not"
2852                                  " exist on destination node '%s'" %
2853                                  (brlist, target_node))
2854
2855     self.instance = instance
2856
2857   def Exec(self, feedback_fn):
2858     """Failover an instance.
2859
2860     The failover is done by shutting it down on its present node and
2861     starting it on the secondary.
2862
2863     """
2864     instance = self.instance
2865
2866     source_node = instance.primary_node
2867     target_node = instance.secondary_nodes[0]
2868
2869     feedback_fn("* checking disk consistency between source and target")
2870     for dev in instance.disks:
2871       # for remote_raid1, these are md over drbd
2872       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2873         if instance.status == "up" and not self.op.ignore_consistency:
2874           raise errors.OpExecError("Disk %s is degraded on target node,"
2875                                    " aborting failover." % dev.iv_name)
2876
2877     feedback_fn("* shutting down instance on source node")
2878     logger.Info("Shutting down instance %s on node %s" %
2879                 (instance.name, source_node))
2880
2881     if not rpc.call_instance_shutdown(source_node, instance):
2882       if self.op.ignore_consistency:
2883         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2884                      " anyway. Please make sure node %s is down"  %
2885                      (instance.name, source_node, source_node))
2886       else:
2887         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2888                                  (instance.name, source_node))
2889
2890     feedback_fn("* deactivating the instance's disks on source node")
2891     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2892       raise errors.OpExecError("Can't shut down the instance's disks.")
2893
2894     instance.primary_node = target_node
2895     # distribute new instance config to the other nodes
2896     self.cfg.Update(instance)
2897
2898     # Only start the instance if it's marked as up
2899     if instance.status == "up":
2900       feedback_fn("* activating the instance's disks on target node")
2901       logger.Info("Starting instance %s on node %s" %
2902                   (instance.name, target_node))
2903
2904       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2905                                                ignore_secondaries=True)
2906       if not disks_ok:
2907         _ShutdownInstanceDisks(instance, self.cfg)
2908         raise errors.OpExecError("Can't activate the instance's disks")
2909
2910       feedback_fn("* starting the instance on the target node")
2911       if not rpc.call_instance_start(target_node, instance, None):
2912         _ShutdownInstanceDisks(instance, self.cfg)
2913         raise errors.OpExecError("Could not start instance %s on node %s." %
2914                                  (instance.name, target_node))
2915
2916
2917 class LUMigrateInstance(LogicalUnit):
2918   """Migrate an instance.
2919
2920   This is migration without shutting down, compared to the failover,
2921   which is done with shutdown.
2922
2923   """
2924   HPATH = "instance-migrate"
2925   HTYPE = constants.HTYPE_INSTANCE
2926   _OP_REQP = ["instance_name", "live"]
2927
2928   def BuildHooksEnv(self):
2929     """Build hooks env.
2930
2931     This runs on master, primary and secondary nodes of the instance.
2932
2933     """
2934     env = _BuildInstanceHookEnvByObject(self.instance)
2935     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2936     return env, nl, nl
2937
2938   def CheckPrereq(self):
2939     """Check prerequisites.
2940
2941     This checks that the instance is in the cluster.
2942
2943     """
2944     instance = self.cfg.GetInstanceInfo(
2945       self.cfg.ExpandInstanceName(self.op.instance_name))
2946     if instance is None:
2947       raise errors.OpPrereqError("Instance '%s' not known" %
2948                                  self.op.instance_name)
2949
2950     if instance.disk_template != constants.DT_DRBD8:
2951       raise errors.OpPrereqError("Instance's disk layout is not"
2952                                  " drbd8, cannot migrate.")
2953
2954     secondary_nodes = instance.secondary_nodes
2955     if not secondary_nodes:
2956       raise errors.ProgrammerError("no secondary node but using "
2957                                    "drbd8 disk template")
2958
2959     target_node = secondary_nodes[0]
2960     # check memory requirements on the secondary node
2961     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2962                          instance.name, instance.memory)
2963
2964     # check bridge existance
2965     brlist = [nic.bridge for nic in instance.nics]
2966     if not rpc.call_bridges_exist(target_node, brlist):
2967       raise errors.OpPrereqError("One or more target bridges %s does not"
2968                                  " exist on destination node '%s'" %
2969                                  (brlist, target_node))
2970
2971     migratable = rpc.call_instance_migratable(instance.primary_node, instance)
2972     if not migratable:
2973       raise errors.OpPrereqError("Can't contact node '%s'" %
2974                                  instance.primary_node)
2975     if not migratable[0]:
2976       raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
2977                                  migratable[1])
2978
2979     self.instance = instance
2980
2981   def Exec(self, feedback_fn):
2982     """Migrate an instance.
2983
2984     The migrate is done by:
2985       - change the disks into dual-master mode
2986       - wait until disks are fully synchronized again
2987       - migrate the instance
2988       - change disks on the new secondary node (the old primary) to secondary
2989       - wait until disks are fully synchronized
2990       - change disks into single-master mode
2991
2992     """
2993     instance = self.instance
2994
2995     source_node = instance.primary_node
2996     target_node = instance.secondary_nodes[0]
2997     all_nodes = [source_node, target_node]
2998
2999     feedback_fn("* checking disk consistency between source and target")
3000     for dev in instance.disks:
3001       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3002         raise errors.OpExecError("Disk %s is degraded or not fully"
3003                                  " synchronized on target node,"
3004                                  " aborting migrate." % dev.iv_name)
3005
3006     feedback_fn("* ensuring the target is in secondary mode")
3007     result = rpc.call_blockdev_close(target_node, instance.name,
3008                                      instance.disks)
3009     if not result or not result[0]:
3010       raise errors.BlockDeviceError("Cannot switch target node to"
3011                                     " secondary: %s" % result[1])
3012
3013     feedback_fn("* changing disks into dual-master mode")
3014     logger.Info("Changing disks for instance %s into dual-master mode" %
3015                 (instance.name,))
3016
3017     nodes_ip = {
3018       source_node: self.cfg.GetNodeInfo(source_node).secondary_ip,
3019       target_node: self.cfg.GetNodeInfo(target_node).secondary_ip,
3020       }
3021     result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3022                                         instance.disks, nodes_ip, True)
3023     for node in all_nodes:
3024       if not result[node] or not result[node][0]:
3025         raise errors.OpExecError("Cannot change disks config on node %s,"
3026                                  " error %s" % (node, result[node][1]))
3027
3028     _WaitForSync(self.cfg, instance, self.proc)
3029
3030     feedback_fn("* migrating instance to %s" % target_node)
3031     result = rpc.call_instance_migrate(source_node, instance,
3032                                        nodes_ip[target_node], self.op.live)
3033     if not result or not result[0]:
3034       logger.Error("Instance migration failed, trying to revert disk status")
3035       res2 = rpc.call_blockdev_close(target_node, instance.name,
3036                                      instance.disks)
3037       if not res2 or not res2[0]:
3038         feedback_fn("Disk close on secondary failed, instance disks left"
3039                     " in dual-master mode")
3040       else:
3041         res2 = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3042                                           instance.disks, nodes_ip, False)
3043         if not (res2 and res2[source_node][0] and res2[target_node][0]):
3044           logger.Error("Warning: DRBD change to single-master failed: shutdown"
3045                        " manually the instance disks or via deactivate-disks,"
3046                        " and then restart them via activate-disks")
3047       raise errors.OpExecError("Could not migrate instance %s: %s" %
3048                                (instance.name, result[1]))
3049
3050     instance.primary_node = target_node
3051     # distribute new instance config to the other nodes
3052     self.cfg.Update(instance)
3053
3054     feedback_fn("* changing the instance's disks on source node to secondary")
3055     result = rpc.call_blockdev_close(source_node, instance.name,
3056                                      instance.disks)
3057     if not result or not result[0]:
3058       logger.Error("Warning: DRBD deactivation failed: shutdown manually the"
3059                    " instance disks or via deactivate-disks, and then"
3060                    " restart them via activate-disks")
3061
3062     _WaitForSync(self.cfg, instance, self.proc)
3063
3064     feedback_fn("* changing the instance's disks to single-master")
3065     result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3066                                         instance.disks, nodes_ip, False)
3067     if not (result and result[source_node][0] and result[target_node][0]):
3068       logger.Error("Warning: DRBD change to single-master failed: shutdown"
3069                    " manually the instance disks or via deactivate-disks,"
3070                    " and then restart them via activate-disks")
3071
3072
3073 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3074   """Create a tree of block devices on the primary node.
3075
3076   This always creates all devices.
3077
3078   """
3079   if device.children:
3080     for child in device.children:
3081       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3082         return False
3083
3084   cfg.SetDiskID(device, node)
3085   new_id = rpc.call_blockdev_create(node, device, device.size,
3086                                     instance.name, True, info)
3087   if not new_id:
3088     return False
3089   if device.physical_id is None:
3090     device.physical_id = new_id
3091   return True
3092
3093
3094 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3095   """Create a tree of block devices on a secondary node.
3096
3097   If this device type has to be created on secondaries, create it and
3098   all its children.
3099
3100   If not, just recurse to children keeping the same 'force' value.
3101
3102   """
3103   if device.CreateOnSecondary():
3104     force = True
3105   if device.children:
3106     for child in device.children:
3107       if not _CreateBlockDevOnSecondary(cfg, node, instance,
3108                                         child, force, info):
3109         return False
3110
3111   if not force:
3112     return True
3113   cfg.SetDiskID(device, node)
3114   new_id = rpc.call_blockdev_create(node, device, device.size,
3115                                     instance.name, False, info)
3116   if not new_id:
3117     return False
3118   if device.physical_id is None:
3119     device.physical_id = new_id
3120   return True
3121
3122
3123 def _GenerateUniqueNames(cfg, exts):
3124   """Generate a suitable LV name.
3125
3126   This will generate a logical volume name for the given instance.
3127
3128   """
3129   results = []
3130   for val in exts:
3131     new_id = cfg.GenerateUniqueID()
3132     results.append("%s%s" % (new_id, val))
3133   return results
3134
3135
3136 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3137   """Generate a drbd device complete with its children.
3138
3139   """
3140   port = cfg.AllocatePort()
3141   vgname = cfg.GetVGName()
3142   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3143                           logical_id=(vgname, names[0]))
3144   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3145                           logical_id=(vgname, names[1]))
3146   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3147                           logical_id = (primary, secondary, port),
3148                           children = [dev_data, dev_meta])
3149   return drbd_dev
3150
3151
3152 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3153   """Generate a drbd8 device complete with its children.
3154
3155   """
3156   port = cfg.AllocatePort()
3157   vgname = cfg.GetVGName()
3158   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3159                           logical_id=(vgname, names[0]))
3160   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3161                           logical_id=(vgname, names[1]))
3162   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3163                           logical_id = (primary, secondary, port),
3164                           children = [dev_data, dev_meta],
3165                           iv_name=iv_name)
3166   return drbd_dev
3167
3168 def _GenerateDiskTemplate(cfg, template_name,
3169                           instance_name, primary_node,
3170                           secondary_nodes, disk_sz, swap_sz):
3171   """Generate the entire disk layout for a given template type.
3172
3173   """
3174   #TODO: compute space requirements
3175
3176   vgname = cfg.GetVGName()
3177   if template_name == constants.DT_DISKLESS:
3178     disks = []
3179   elif template_name == constants.DT_PLAIN:
3180     if len(secondary_nodes) != 0:
3181       raise errors.ProgrammerError("Wrong template configuration")
3182
3183     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3184     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3185                            logical_id=(vgname, names[0]),
3186                            iv_name = "sda")
3187     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3188                            logical_id=(vgname, names[1]),
3189                            iv_name = "sdb")
3190     disks = [sda_dev, sdb_dev]
3191   elif template_name == constants.DT_LOCAL_RAID1:
3192     if len(secondary_nodes) != 0:
3193       raise errors.ProgrammerError("Wrong template configuration")
3194
3195
3196     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3197                                        ".sdb_m1", ".sdb_m2"])
3198     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3199                               logical_id=(vgname, names[0]))
3200     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3201                               logical_id=(vgname, names[1]))
3202     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3203                               size=disk_sz,
3204                               children = [sda_dev_m1, sda_dev_m2])
3205     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3206                               logical_id=(vgname, names[2]))
3207     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3208                               logical_id=(vgname, names[3]))
3209     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3210                               size=swap_sz,
3211                               children = [sdb_dev_m1, sdb_dev_m2])
3212     disks = [md_sda_dev, md_sdb_dev]
3213   elif template_name == constants.DT_REMOTE_RAID1:
3214     if len(secondary_nodes) != 1:
3215       raise errors.ProgrammerError("Wrong template configuration")
3216     remote_node = secondary_nodes[0]
3217     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3218                                        ".sdb_data", ".sdb_meta"])
3219     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3220                                          disk_sz, names[0:2])
3221     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3222                               children = [drbd_sda_dev], size=disk_sz)
3223     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3224                                          swap_sz, names[2:4])
3225     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3226                               children = [drbd_sdb_dev], size=swap_sz)
3227     disks = [md_sda_dev, md_sdb_dev]
3228   elif template_name == constants.DT_DRBD8:
3229     if len(secondary_nodes) != 1:
3230       raise errors.ProgrammerError("Wrong template configuration")
3231     remote_node = secondary_nodes[0]
3232     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3233                                        ".sdb_data", ".sdb_meta"])
3234     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3235                                          disk_sz, names[0:2], "sda")
3236     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3237                                          swap_sz, names[2:4], "sdb")
3238     disks = [drbd_sda_dev, drbd_sdb_dev]
3239   else:
3240     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3241   return disks
3242
3243
3244 def _GetInstanceInfoText(instance):
3245   """Compute that text that should be added to the disk's metadata.
3246
3247   """
3248   return "originstname+%s" % instance.name
3249
3250
3251 def _CreateDisks(cfg, instance):
3252   """Create all disks for an instance.
3253
3254   This abstracts away some work from AddInstance.
3255
3256   Args:
3257     instance: the instance object
3258
3259   Returns:
3260     True or False showing the success of the creation process
3261
3262   """
3263   info = _GetInstanceInfoText(instance)
3264
3265   for device in instance.disks:
3266     logger.Info("creating volume %s for instance %s" %
3267               (device.iv_name, instance.name))
3268     #HARDCODE
3269     for secondary_node in instance.secondary_nodes:
3270       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3271                                         device, False, info):
3272         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3273                      (device.iv_name, device, secondary_node))
3274         return False
3275     #HARDCODE
3276     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3277                                     instance, device, info):
3278       logger.Error("failed to create volume %s on primary!" %
3279                    device.iv_name)
3280       return False
3281   return True
3282
3283
3284 def _RemoveDisks(instance, cfg):
3285   """Remove all disks for an instance.
3286
3287   This abstracts away some work from `AddInstance()` and
3288   `RemoveInstance()`. Note that in case some of the devices couldn't
3289   be removed, the removal will continue with the other ones (compare
3290   with `_CreateDisks()`).
3291
3292   Args:
3293     instance: the instance object
3294
3295   Returns:
3296     True or False showing the success of the removal proces
3297
3298   """
3299   logger.Info("removing block devices for instance %s" % instance.name)
3300
3301   result = True
3302   for device in instance.disks:
3303     for node, disk in device.ComputeNodeTree(instance.primary_node):
3304       cfg.SetDiskID(disk, node)
3305       if not rpc.call_blockdev_remove(node, disk):
3306         logger.Error("could not remove block device %s on node %s,"
3307                      " continuing anyway" %
3308                      (device.iv_name, node))
3309         result = False
3310   return result
3311
3312
3313 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3314   """Compute disk size requirements in the volume group
3315
3316   This is currently hard-coded for the two-drive layout.
3317
3318   """
3319   # Required free disk space as a function of disk and swap space
3320   req_size_dict = {
3321     constants.DT_DISKLESS: None,
3322     constants.DT_PLAIN: disk_size + swap_size,
3323     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3324     # 256 MB are added for drbd metadata, 128MB for each drbd device
3325     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3326     constants.DT_DRBD8: disk_size + swap_size + 256,
3327   }
3328
3329   if disk_template not in req_size_dict:
3330     raise errors.ProgrammerError("Disk template '%s' size requirement"
3331                                  " is unknown" %  disk_template)
3332
3333   return req_size_dict[disk_template]
3334
3335
3336 class LUCreateInstance(LogicalUnit):
3337   """Create an instance.
3338
3339   """
3340   HPATH = "instance-add"
3341   HTYPE = constants.HTYPE_INSTANCE
3342   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3343               "disk_template", "swap_size", "mode", "start", "vcpus",
3344               "wait_for_sync", "ip_check", "mac"]
3345
3346   def _RunAllocator(self):
3347     """Run the allocator based on input opcode.
3348
3349     """
3350     disks = [{"size": self.op.disk_size, "mode": "w"},
3351              {"size": self.op.swap_size, "mode": "w"}]
3352     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3353              "bridge": self.op.bridge}]
3354     ial = IAllocator(self.cfg, self.sstore,
3355                      mode=constants.IALLOCATOR_MODE_ALLOC,
3356                      name=self.op.instance_name,
3357                      disk_template=self.op.disk_template,
3358                      tags=[],
3359                      os=self.op.os_type,
3360                      vcpus=self.op.vcpus,
3361                      mem_size=self.op.mem_size,
3362                      disks=disks,
3363                      nics=nics,
3364                      )
3365
3366     ial.Run(self.op.iallocator)
3367
3368     if not ial.success:
3369       raise errors.OpPrereqError("Can't compute nodes using"
3370                                  " iallocator '%s': %s" % (self.op.iallocator,
3371                                                            ial.info))
3372     if len(ial.nodes) != ial.required_nodes:
3373       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3374                                  " of nodes (%s), required %s" %
3375                                  (len(ial.nodes), ial.required_nodes))
3376     self.op.pnode = ial.nodes[0]
3377     logger.ToStdout("Selected nodes for the instance: %s" %
3378                     (", ".join(ial.nodes),))
3379     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3380                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3381     if ial.required_nodes == 2:
3382       self.op.snode = ial.nodes[1]
3383
3384   def BuildHooksEnv(self):
3385     """Build hooks env.
3386
3387     This runs on master, primary and secondary nodes of the instance.
3388
3389     """
3390     env = {
3391       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3392       "INSTANCE_DISK_SIZE": self.op.disk_size,
3393       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3394       "INSTANCE_ADD_MODE": self.op.mode,
3395       }
3396     if self.op.mode == constants.INSTANCE_IMPORT:
3397       env["INSTANCE_SRC_NODE"] = self.op.src_node
3398       env["INSTANCE_SRC_PATH"] = self.op.src_path
3399       env["INSTANCE_SRC_IMAGE"] = self.src_image
3400
3401     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3402       primary_node=self.op.pnode,
3403       secondary_nodes=self.secondaries,
3404       status=self.instance_status,
3405       os_type=self.op.os_type,
3406       memory=self.op.mem_size,
3407       vcpus=self.op.vcpus,
3408       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3409     ))
3410
3411     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3412           self.secondaries)
3413     return env, nl, nl
3414
3415
3416   def CheckPrereq(self):
3417     """Check prerequisites.
3418
3419     """
3420     # set optional parameters to none if they don't exist
3421     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3422                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3423                  "vnc_bind_address"]:
3424       if not hasattr(self.op, attr):
3425         setattr(self.op, attr, None)
3426
3427     if self.op.mode not in (constants.INSTANCE_CREATE,
3428                             constants.INSTANCE_IMPORT):
3429       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3430                                  self.op.mode)
3431
3432     if self.op.mode == constants.INSTANCE_IMPORT:
3433       src_node = getattr(self.op, "src_node", None)
3434       src_path = getattr(self.op, "src_path", None)
3435       if src_node is None or src_path is None:
3436         raise errors.OpPrereqError("Importing an instance requires source"
3437                                    " node and path options")
3438       src_node_full = self.cfg.ExpandNodeName(src_node)
3439       if src_node_full is None:
3440         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3441       self.op.src_node = src_node = src_node_full
3442
3443       if not os.path.isabs(src_path):
3444         raise errors.OpPrereqError("The source path must be absolute")
3445
3446       export_info = rpc.call_export_info(src_node, src_path)
3447
3448       if not export_info:
3449         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3450
3451       if not export_info.has_section(constants.INISECT_EXP):
3452         raise errors.ProgrammerError("Corrupted export config")
3453
3454       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3455       if (int(ei_version) != constants.EXPORT_VERSION):
3456         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3457                                    (ei_version, constants.EXPORT_VERSION))
3458
3459       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3460         raise errors.OpPrereqError("Can't import instance with more than"
3461                                    " one data disk")
3462
3463       # FIXME: are the old os-es, disk sizes, etc. useful?
3464       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3465       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3466                                                          'disk0_dump'))
3467       self.src_image = diskimage
3468     else: # INSTANCE_CREATE
3469       if getattr(self.op, "os_type", None) is None:
3470         raise errors.OpPrereqError("No guest OS specified")
3471
3472     #### instance parameters check
3473
3474     # disk template and mirror node verification
3475     if self.op.disk_template not in constants.DISK_TEMPLATES:
3476       raise errors.OpPrereqError("Invalid disk template name")
3477
3478     # instance name verification
3479     hostname1 = utils.HostInfo(self.op.instance_name)
3480
3481     self.op.instance_name = instance_name = hostname1.name
3482     instance_list = self.cfg.GetInstanceList()
3483     if instance_name in instance_list:
3484       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3485                                  instance_name)
3486
3487     # ip validity checks
3488     ip = getattr(self.op, "ip", None)
3489     if ip is None or ip.lower() == "none":
3490       inst_ip = None
3491     elif ip.lower() == "auto":
3492       inst_ip = hostname1.ip
3493     else:
3494       if not utils.IsValidIP(ip):
3495         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3496                                    " like a valid IP" % ip)
3497       inst_ip = ip
3498     self.inst_ip = self.op.ip = inst_ip
3499
3500     if self.op.start and not self.op.ip_check:
3501       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3502                                  " adding an instance in start mode")
3503
3504     if self.op.ip_check:
3505       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3506         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3507                                    (hostname1.ip, instance_name))
3508
3509     # MAC address verification
3510     if self.op.mac != "auto":
3511       if not utils.IsValidMac(self.op.mac.lower()):
3512         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3513                                    self.op.mac)
3514
3515     # bridge verification
3516     bridge = getattr(self.op, "bridge", None)
3517     if bridge is None:
3518       self.op.bridge = self.cfg.GetDefBridge()
3519     else:
3520       self.op.bridge = bridge
3521
3522     # boot order verification
3523     if self.op.hvm_boot_order is not None:
3524       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3525         raise errors.OpPrereqError("invalid boot order specified,"
3526                                    " must be one or more of [acdn]")
3527     #### allocator run
3528
3529     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3530       raise errors.OpPrereqError("One and only one of iallocator and primary"
3531                                  " node must be given")
3532
3533     if self.op.iallocator is not None:
3534       self._RunAllocator()
3535
3536     #### node related checks
3537
3538     # check primary node
3539     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3540     if pnode is None:
3541       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3542                                  self.op.pnode)
3543     self.op.pnode = pnode.name
3544     self.pnode = pnode
3545     self.secondaries = []
3546
3547     # mirror node verification
3548     if self.op.disk_template in constants.DTS_NET_MIRROR:
3549       if getattr(self.op, "snode", None) is None:
3550         raise errors.OpPrereqError("The networked disk templates need"
3551                                    " a mirror node")
3552
3553       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3554       if snode_name is None:
3555         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3556                                    self.op.snode)
3557       elif snode_name == pnode.name:
3558         raise errors.OpPrereqError("The secondary node cannot be"
3559                                    " the primary node.")
3560       self.secondaries.append(snode_name)
3561
3562     req_size = _ComputeDiskSize(self.op.disk_template,
3563                                 self.op.disk_size, self.op.swap_size)
3564
3565     # Check lv size requirements
3566     if req_size is not None:
3567       nodenames = [pnode.name] + self.secondaries
3568       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3569       for node in nodenames:
3570         info = nodeinfo.get(node, None)
3571         if not info:
3572           raise errors.OpPrereqError("Cannot get current information"
3573                                      " from node '%s'" % node)
3574         vg_free = info.get('vg_free', None)
3575         if not isinstance(vg_free, int):
3576           raise errors.OpPrereqError("Can't compute free disk space on"
3577                                      " node %s" % node)
3578         if req_size > info['vg_free']:
3579           raise errors.OpPrereqError("Not enough disk space on target node %s."
3580                                      " %d MB available, %d MB required" %
3581                                      (node, info['vg_free'], req_size))
3582
3583     # os verification
3584     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3585     if not os_obj:
3586       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3587                                  " primary node"  % self.op.os_type)
3588
3589     if self.op.kernel_path == constants.VALUE_NONE:
3590       raise errors.OpPrereqError("Can't set instance kernel to none")
3591
3592
3593     # bridge check on primary node
3594     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3595       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3596                                  " destination node '%s'" %
3597                                  (self.op.bridge, pnode.name))
3598
3599     # memory check on primary node
3600     if self.op.start:
3601       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3602                            "creating instance %s" % self.op.instance_name,
3603                            self.op.mem_size)
3604
3605     # hvm_cdrom_image_path verification
3606     if self.op.hvm_cdrom_image_path is not None:
3607       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3608         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3609                                    " be an absolute path or None, not %s" %
3610                                    self.op.hvm_cdrom_image_path)
3611       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3612         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3613                                    " regular file or a symlink pointing to"
3614                                    " an existing regular file, not %s" %
3615                                    self.op.hvm_cdrom_image_path)
3616
3617     # vnc_bind_address verification
3618     if self.op.vnc_bind_address is not None:
3619       if not utils.IsValidIP(self.op.vnc_bind_address):
3620         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3621                                    " like a valid IP address" %
3622                                    self.op.vnc_bind_address)
3623
3624     if self.op.start:
3625       self.instance_status = 'up'
3626     else:
3627       self.instance_status = 'down'
3628
3629   def Exec(self, feedback_fn):
3630     """Create and add the instance to the cluster.
3631
3632     """
3633     instance = self.op.instance_name
3634     pnode_name = self.pnode.name
3635
3636     if self.op.mac == "auto":
3637       mac_address = self.cfg.GenerateMAC()
3638     else:
3639       mac_address = self.op.mac
3640
3641     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3642     if self.inst_ip is not None:
3643       nic.ip = self.inst_ip
3644
3645     ht_kind = self.sstore.GetHypervisorType()
3646     if ht_kind in constants.HTS_REQ_PORT:
3647       network_port = self.cfg.AllocatePort()
3648     else:
3649       network_port = None
3650
3651     if self.op.vnc_bind_address is None:
3652       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3653
3654     disks = _GenerateDiskTemplate(self.cfg,
3655                                   self.op.disk_template,
3656                                   instance, pnode_name,
3657                                   self.secondaries, self.op.disk_size,
3658                                   self.op.swap_size)
3659
3660     iobj = objects.Instance(name=instance, os=self.op.os_type,
3661                             primary_node=pnode_name,
3662                             memory=self.op.mem_size,
3663                             vcpus=self.op.vcpus,
3664                             nics=[nic], disks=disks,
3665                             disk_template=self.op.disk_template,
3666                             status=self.instance_status,
3667                             network_port=network_port,
3668                             kernel_path=self.op.kernel_path,
3669                             initrd_path=self.op.initrd_path,
3670                             hvm_boot_order=self.op.hvm_boot_order,
3671                             hvm_acpi=self.op.hvm_acpi,
3672                             hvm_pae=self.op.hvm_pae,
3673                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3674                             vnc_bind_address=self.op.vnc_bind_address,
3675                             )
3676
3677     feedback_fn("* creating instance disks...")
3678     if not _CreateDisks(self.cfg, iobj):
3679       _RemoveDisks(iobj, self.cfg)
3680       raise errors.OpExecError("Device creation failed, reverting...")
3681
3682     feedback_fn("adding instance %s to cluster config" % instance)
3683
3684     self.cfg.AddInstance(iobj)
3685
3686     if self.op.wait_for_sync:
3687       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3688     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3689       # make sure the disks are not degraded (still sync-ing is ok)
3690       time.sleep(15)
3691       feedback_fn("* checking mirrors status")
3692       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3693     else:
3694       disk_abort = False
3695
3696     if disk_abort:
3697       _RemoveDisks(iobj, self.cfg)
3698       self.cfg.RemoveInstance(iobj.name)
3699       raise errors.OpExecError("There are some degraded disks for"
3700                                " this instance")
3701
3702     feedback_fn("creating os for instance %s on node %s" %
3703                 (instance, pnode_name))
3704
3705     if iobj.disk_template != constants.DT_DISKLESS:
3706       if self.op.mode == constants.INSTANCE_CREATE:
3707         feedback_fn("* running the instance OS create scripts...")
3708         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3709           raise errors.OpExecError("could not add os for instance %s"
3710                                    " on node %s" %
3711                                    (instance, pnode_name))
3712
3713       elif self.op.mode == constants.INSTANCE_IMPORT:
3714         feedback_fn("* running the instance OS import scripts...")
3715         src_node = self.op.src_node
3716         src_image = self.src_image
3717         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3718                                                 src_node, src_image):
3719           raise errors.OpExecError("Could not import os for instance"
3720                                    " %s on node %s" %
3721                                    (instance, pnode_name))
3722       else:
3723         # also checked in the prereq part
3724         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3725                                      % self.op.mode)
3726
3727     if self.op.start:
3728       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3729       feedback_fn("* starting instance...")
3730       if not rpc.call_instance_start(pnode_name, iobj, None):
3731         raise errors.OpExecError("Could not start instance")
3732
3733
3734 class LUConnectConsole(NoHooksLU):
3735   """Connect to an instance's console.
3736
3737   This is somewhat special in that it returns the command line that
3738   you need to run on the master node in order to connect to the
3739   console.
3740
3741   """
3742   _OP_REQP = ["instance_name"]
3743
3744   def CheckPrereq(self):
3745     """Check prerequisites.
3746
3747     This checks that the instance is in the cluster.
3748
3749     """
3750     instance = self.cfg.GetInstanceInfo(
3751       self.cfg.ExpandInstanceName(self.op.instance_name))
3752     if instance is None:
3753       raise errors.OpPrereqError("Instance '%s' not known" %
3754                                  self.op.instance_name)
3755     self.instance = instance
3756
3757   def Exec(self, feedback_fn):
3758     """Connect to the console of an instance
3759
3760     """
3761     instance = self.instance
3762     node = instance.primary_node
3763
3764     node_insts = rpc.call_instance_list([node])[node]
3765     if node_insts is False:
3766       raise errors.OpExecError("Can't connect to node %s." % node)
3767
3768     if instance.name not in node_insts:
3769       raise errors.OpExecError("Instance %s is not running." % instance.name)
3770
3771     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3772
3773     hyper = hypervisor.GetHypervisor()
3774     console_cmd = hyper.GetShellCommandForConsole(instance)
3775     # build ssh cmdline
3776     argv = ["ssh", "-q", "-t"]
3777     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3778     argv.extend(ssh.BATCH_MODE_OPTS)
3779     argv.append(node)
3780     argv.append(console_cmd)
3781     return "ssh", argv
3782
3783
3784 class LUAddMDDRBDComponent(LogicalUnit):
3785   """Adda new mirror member to an instance's disk.
3786
3787   """
3788   HPATH = "mirror-add"
3789   HTYPE = constants.HTYPE_INSTANCE
3790   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3791
3792   def BuildHooksEnv(self):
3793     """Build hooks env.
3794
3795     This runs on the master, the primary and all the secondaries.
3796
3797     """
3798     env = {
3799       "NEW_SECONDARY": self.op.remote_node,
3800       "DISK_NAME": self.op.disk_name,
3801       }
3802     env.update(_BuildInstanceHookEnvByObject(self.instance))
3803     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3804           self.op.remote_node,] + list(self.instance.secondary_nodes)
3805     return env, nl, nl
3806
3807   def CheckPrereq(self):
3808     """Check prerequisites.
3809
3810     This checks that the instance is in the cluster.
3811
3812     """
3813     instance = self.cfg.GetInstanceInfo(
3814       self.cfg.ExpandInstanceName(self.op.instance_name))
3815     if instance is None:
3816       raise errors.OpPrereqError("Instance '%s' not known" %
3817                                  self.op.instance_name)
3818     self.instance = instance
3819
3820     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3821     if remote_node is None:
3822       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3823     self.remote_node = remote_node
3824
3825     if remote_node == instance.primary_node:
3826       raise errors.OpPrereqError("The specified node is the primary node of"
3827                                  " the instance.")
3828
3829     if instance.disk_template != constants.DT_REMOTE_RAID1:
3830       raise errors.OpPrereqError("Instance's disk layout is not"
3831                                  " remote_raid1.")
3832     for disk in instance.disks:
3833       if disk.iv_name == self.op.disk_name:
3834         break
3835     else:
3836       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3837                                  " instance." % self.op.disk_name)
3838     if len(disk.children) > 1:
3839       raise errors.OpPrereqError("The device already has two slave devices."
3840                                  " This would create a 3-disk raid1 which we"
3841                                  " don't allow.")
3842     self.disk = disk
3843
3844   def Exec(self, feedback_fn):
3845     """Add the mirror component
3846
3847     """
3848     disk = self.disk
3849     instance = self.instance
3850
3851     remote_node = self.remote_node
3852     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3853     names = _GenerateUniqueNames(self.cfg, lv_names)
3854     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3855                                      remote_node, disk.size, names)
3856
3857     logger.Info("adding new mirror component on secondary")
3858     #HARDCODE
3859     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3860                                       new_drbd, False,
3861                                       _GetInstanceInfoText(instance)):
3862       raise errors.OpExecError("Failed to create new component on secondary"
3863                                " node %s" % remote_node)
3864
3865     logger.Info("adding new mirror component on primary")
3866     #HARDCODE
3867     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3868                                     instance, new_drbd,
3869                                     _GetInstanceInfoText(instance)):
3870       # remove secondary dev
3871       self.cfg.SetDiskID(new_drbd, remote_node)
3872       rpc.call_blockdev_remove(remote_node, new_drbd)
3873       raise errors.OpExecError("Failed to create volume on primary")
3874
3875     # the device exists now
3876     # call the primary node to add the mirror to md
3877     logger.Info("adding new mirror component to md")
3878     if not rpc.call_blockdev_addchildren(instance.primary_node,
3879                                          disk, [new_drbd]):
3880       logger.Error("Can't add mirror compoment to md!")
3881       self.cfg.SetDiskID(new_drbd, remote_node)
3882       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3883         logger.Error("Can't rollback on secondary")
3884       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3885       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3886         logger.Error("Can't rollback on primary")
3887       raise errors.OpExecError("Can't add mirror component to md array")
3888
3889     disk.children.append(new_drbd)
3890
3891     self.cfg.AddInstance(instance)
3892
3893     _WaitForSync(self.cfg, instance, self.proc)
3894
3895     return 0
3896
3897
3898 class LURemoveMDDRBDComponent(LogicalUnit):
3899   """Remove a component from a remote_raid1 disk.
3900
3901   """
3902   HPATH = "mirror-remove"
3903   HTYPE = constants.HTYPE_INSTANCE
3904   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3905
3906   def BuildHooksEnv(self):
3907     """Build hooks env.
3908
3909     This runs on the master, the primary and all the secondaries.
3910
3911     """
3912     env = {
3913       "DISK_NAME": self.op.disk_name,
3914       "DISK_ID": self.op.disk_id,
3915       "OLD_SECONDARY": self.old_secondary,
3916       }
3917     env.update(_BuildInstanceHookEnvByObject(self.instance))
3918     nl = [self.sstore.GetMasterNode(),
3919           self.instance.primary_node] + list(self.instance.secondary_nodes)
3920     return env, nl, nl
3921
3922   def CheckPrereq(self):
3923     """Check prerequisites.
3924
3925     This checks that the instance is in the cluster.
3926
3927     """
3928     instance = self.cfg.GetInstanceInfo(
3929       self.cfg.ExpandInstanceName(self.op.instance_name))
3930     if instance is None:
3931       raise errors.OpPrereqError("Instance '%s' not known" %
3932                                  self.op.instance_name)
3933     self.instance = instance
3934
3935     if instance.disk_template != constants.DT_REMOTE_RAID1:
3936       raise errors.OpPrereqError("Instance's disk layout is not"
3937                                  " remote_raid1.")
3938     for disk in instance.disks:
3939       if disk.iv_name == self.op.disk_name:
3940         break
3941     else:
3942       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3943                                  " instance." % self.op.disk_name)
3944     for child in disk.children:
3945       if (child.dev_type == constants.LD_DRBD7 and
3946           child.logical_id[2] == self.op.disk_id):
3947         break
3948     else:
3949       raise errors.OpPrereqError("Can't find the device with this port.")
3950
3951     if len(disk.children) < 2:
3952       raise errors.OpPrereqError("Cannot remove the last component from"
3953                                  " a mirror.")
3954     self.disk = disk
3955     self.child = child
3956     if self.child.logical_id[0] == instance.primary_node:
3957       oid = 1
3958     else:
3959       oid = 0
3960     self.old_secondary = self.child.logical_id[oid]
3961
3962   def Exec(self, feedback_fn):
3963     """Remove the mirror component
3964
3965     """
3966     instance = self.instance
3967     disk = self.disk
3968     child = self.child
3969     logger.Info("remove mirror component")
3970     self.cfg.SetDiskID(disk, instance.primary_node)
3971     if not rpc.call_blockdev_removechildren(instance.primary_node,
3972                                             disk, [child]):
3973       raise errors.OpExecError("Can't remove child from mirror.")
3974
3975     for node in child.logical_id[:2]:
3976       self.cfg.SetDiskID(child, node)
3977       if not rpc.call_blockdev_remove(node, child):
3978         logger.Error("Warning: failed to remove device from node %s,"
3979                      " continuing operation." % node)
3980
3981     disk.children.remove(child)
3982     self.cfg.AddInstance(instance)
3983
3984
3985 class LUReplaceDisks(LogicalUnit):
3986   """Replace the disks of an instance.
3987
3988   """
3989   HPATH = "mirrors-replace"
3990   HTYPE = constants.HTYPE_INSTANCE
3991   _OP_REQP = ["instance_name", "mode", "disks"]
3992
3993   def _RunAllocator(self):
3994     """Compute a new secondary node using an IAllocator.
3995
3996     """
3997     ial = IAllocator(self.cfg, self.sstore,
3998                      mode=constants.IALLOCATOR_MODE_RELOC,
3999                      name=self.op.instance_name,
4000                      relocate_from=[self.sec_node])
4001
4002     ial.Run(self.op.iallocator)
4003
4004     if not ial.success:
4005       raise errors.OpPrereqError("Can't compute nodes using"
4006                                  " iallocator '%s': %s" % (self.op.iallocator,
4007                                                            ial.info))
4008     if len(ial.nodes) != ial.required_nodes:
4009       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4010                                  " of nodes (%s), required %s" %
4011                                  (len(ial.nodes), ial.required_nodes))
4012     self.op.remote_node = ial.nodes[0]
4013     logger.ToStdout("Selected new secondary for the instance: %s" %
4014                     self.op.remote_node)
4015
4016   def BuildHooksEnv(self):
4017     """Build hooks env.
4018
4019     This runs on the master, the primary and all the secondaries.
4020
4021     """
4022     env = {
4023       "MODE": self.op.mode,
4024       "NEW_SECONDARY": self.op.remote_node,
4025       "OLD_SECONDARY": self.instance.secondary_nodes[0],
4026       }
4027     env.update(_BuildInstanceHookEnvByObject(self.instance))
4028     nl = [
4029       self.sstore.GetMasterNode(),
4030       self.instance.primary_node,
4031       ]
4032     if self.op.remote_node is not None:
4033       nl.append(self.op.remote_node)
4034     return env, nl, nl
4035
4036   def CheckPrereq(self):
4037     """Check prerequisites.
4038
4039     This checks that the instance is in the cluster.
4040
4041     """
4042     if not hasattr(self.op, "remote_node"):
4043       self.op.remote_node = None
4044
4045     instance = self.cfg.GetInstanceInfo(
4046       self.cfg.ExpandInstanceName(self.op.instance_name))
4047     if instance is None:
4048       raise errors.OpPrereqError("Instance '%s' not known" %
4049                                  self.op.instance_name)
4050     self.instance = instance
4051     self.op.instance_name = instance.name
4052
4053     if instance.disk_template not in constants.DTS_NET_MIRROR:
4054       raise errors.OpPrereqError("Instance's disk layout is not"
4055                                  " network mirrored.")
4056
4057     if len(instance.secondary_nodes) != 1:
4058       raise errors.OpPrereqError("The instance has a strange layout,"
4059                                  " expected one secondary but found %d" %
4060                                  len(instance.secondary_nodes))
4061
4062     self.sec_node = instance.secondary_nodes[0]
4063
4064     ia_name = getattr(self.op, "iallocator", None)
4065     if ia_name is not None:
4066       if self.op.remote_node is not None:
4067         raise errors.OpPrereqError("Give either the iallocator or the new"
4068                                    " secondary, not both")
4069       self.op.remote_node = self._RunAllocator()
4070
4071     remote_node = self.op.remote_node
4072     if remote_node is not None:
4073       remote_node = self.cfg.ExpandNodeName(remote_node)
4074       if remote_node is None:
4075         raise errors.OpPrereqError("Node '%s' not known" %
4076                                    self.op.remote_node)
4077       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4078     else:
4079       self.remote_node_info = None
4080     if remote_node == instance.primary_node:
4081       raise errors.OpPrereqError("The specified node is the primary node of"
4082                                  " the instance.")
4083     elif remote_node == self.sec_node:
4084       if self.op.mode == constants.REPLACE_DISK_SEC:
4085         # this is for DRBD8, where we can't execute the same mode of
4086         # replacement as for drbd7 (no different port allocated)
4087         raise errors.OpPrereqError("Same secondary given, cannot execute"
4088                                    " replacement")
4089       # the user gave the current secondary, switch to
4090       # 'no-replace-secondary' mode for drbd7
4091       remote_node = None
4092     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4093         self.op.mode != constants.REPLACE_DISK_ALL):
4094       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4095                                  " disks replacement, not individual ones")
4096     if instance.disk_template == constants.DT_DRBD8:
4097       if (self.op.mode == constants.REPLACE_DISK_ALL and
4098           remote_node is not None):
4099         # switch to replace secondary mode
4100         self.op.mode = constants.REPLACE_DISK_SEC
4101
4102       if self.op.mode == constants.REPLACE_DISK_ALL:
4103         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4104                                    " secondary disk replacement, not"
4105                                    " both at once")
4106       elif self.op.mode == constants.REPLACE_DISK_PRI:
4107         if remote_node is not None:
4108           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4109                                      " the secondary while doing a primary"
4110                                      " node disk replacement")
4111         self.tgt_node = instance.primary_node
4112         self.oth_node = instance.secondary_nodes[0]
4113       elif self.op.mode == constants.REPLACE_DISK_SEC:
4114         self.new_node = remote_node # this can be None, in which case
4115                                     # we don't change the secondary
4116         self.tgt_node = instance.secondary_nodes[0]
4117         self.oth_node = instance.primary_node
4118       else:
4119         raise errors.ProgrammerError("Unhandled disk replace mode")
4120
4121     for name in self.op.disks:
4122       if instance.FindDisk(name) is None:
4123         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4124                                    (name, instance.name))
4125     self.op.remote_node = remote_node
4126
4127   def _ExecRR1(self, feedback_fn):
4128     """Replace the disks of an instance.
4129
4130     """
4131     instance = self.instance
4132     iv_names = {}
4133     # start of work
4134     if self.op.remote_node is None:
4135       remote_node = self.sec_node
4136     else:
4137       remote_node = self.op.remote_node
4138     cfg = self.cfg
4139     for dev in instance.disks:
4140       size = dev.size
4141       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4142       names = _GenerateUniqueNames(cfg, lv_names)
4143       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4144                                        remote_node, size, names)
4145       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4146       logger.Info("adding new mirror component on secondary for %s" %
4147                   dev.iv_name)
4148       #HARDCODE
4149       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4150                                         new_drbd, False,
4151                                         _GetInstanceInfoText(instance)):
4152         raise errors.OpExecError("Failed to create new component on secondary"
4153                                  " node %s. Full abort, cleanup manually!" %
4154                                  remote_node)
4155
4156       logger.Info("adding new mirror component on primary")
4157       #HARDCODE
4158       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4159                                       instance, new_drbd,
4160                                       _GetInstanceInfoText(instance)):
4161         # remove secondary dev
4162         cfg.SetDiskID(new_drbd, remote_node)
4163         rpc.call_blockdev_remove(remote_node, new_drbd)
4164         raise errors.OpExecError("Failed to create volume on primary!"
4165                                  " Full abort, cleanup manually!!")
4166
4167       # the device exists now
4168       # call the primary node to add the mirror to md
4169       logger.Info("adding new mirror component to md")
4170       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4171                                            [new_drbd]):
4172         logger.Error("Can't add mirror compoment to md!")
4173         cfg.SetDiskID(new_drbd, remote_node)
4174         if not rpc.call_blockdev_remove(remote_node, new_drbd):
4175           logger.Error("Can't rollback on secondary")
4176         cfg.SetDiskID(new_drbd, instance.primary_node)
4177         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4178           logger.Error("Can't rollback on primary")
4179         raise errors.OpExecError("Full abort, cleanup manually!!")
4180
4181       dev.children.append(new_drbd)
4182       cfg.AddInstance(instance)
4183
4184     # this can fail as the old devices are degraded and _WaitForSync
4185     # does a combined result over all disks, so we don't check its
4186     # return value
4187     _WaitForSync(cfg, instance, self.proc, unlock=True)
4188
4189     # so check manually all the devices
4190     for name in iv_names:
4191       dev, child, new_drbd = iv_names[name]
4192       cfg.SetDiskID(dev, instance.primary_node)
4193       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4194       if is_degr:
4195         raise errors.OpExecError("MD device %s is degraded!" % name)
4196       cfg.SetDiskID(new_drbd, instance.primary_node)
4197       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4198       if is_degr:
4199         raise errors.OpExecError("New drbd device %s is degraded!" % name)
4200
4201     for name in iv_names:
4202       dev, child, new_drbd = iv_names[name]
4203       logger.Info("remove mirror %s component" % name)
4204       cfg.SetDiskID(dev, instance.primary_node)
4205       if not rpc.call_blockdev_removechildren(instance.primary_node,
4206                                               dev, [child]):
4207         logger.Error("Can't remove child from mirror, aborting"
4208                      " *this device cleanup*.\nYou need to cleanup manually!!")
4209         continue
4210
4211       for node in child.logical_id[:2]:
4212         logger.Info("remove child device on %s" % node)
4213         cfg.SetDiskID(child, node)
4214         if not rpc.call_blockdev_remove(node, child):
4215           logger.Error("Warning: failed to remove device from node %s,"
4216                        " continuing operation." % node)
4217
4218       dev.children.remove(child)
4219
4220       cfg.AddInstance(instance)
4221
4222   def _ExecD8DiskOnly(self, feedback_fn):
4223     """Replace a disk on the primary or secondary for dbrd8.
4224
4225     The algorithm for replace is quite complicated:
4226       - for each disk to be replaced:
4227         - create new LVs on the target node with unique names
4228         - detach old LVs from the drbd device
4229         - rename old LVs to name_replaced.<time_t>
4230         - rename new LVs to old LVs
4231         - attach the new LVs (with the old names now) to the drbd device
4232       - wait for sync across all devices
4233       - for each modified disk:
4234         - remove old LVs (which have the name name_replaces.<time_t>)
4235
4236     Failures are not very well handled.
4237
4238     """
4239     steps_total = 6
4240     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4241     instance = self.instance
4242     iv_names = {}
4243     vgname = self.cfg.GetVGName()
4244     # start of work
4245     cfg = self.cfg
4246     tgt_node = self.tgt_node
4247     oth_node = self.oth_node
4248
4249     # Step: check device activation
4250     self.proc.LogStep(1, steps_total, "check device existence")
4251     info("checking volume groups")
4252     my_vg = cfg.GetVGName()
4253     results = rpc.call_vg_list([oth_node, tgt_node])
4254     if not results:
4255       raise errors.OpExecError("Can't list volume groups on the nodes")
4256     for node in oth_node, tgt_node:
4257       res = results.get(node, False)
4258       if not res or my_vg not in res:
4259         raise errors.OpExecError("Volume group '%s' not found on %s" %
4260                                  (my_vg, node))
4261     for dev in instance.disks:
4262       if not dev.iv_name in self.op.disks:
4263         continue
4264       for node in tgt_node, oth_node:
4265         info("checking %s on %s" % (dev.iv_name, node))
4266         cfg.SetDiskID(dev, node)
4267         if not rpc.call_blockdev_find(node, dev):
4268           raise errors.OpExecError("Can't find device %s on node %s" %
4269                                    (dev.iv_name, node))
4270
4271     # Step: check other node consistency
4272     self.proc.LogStep(2, steps_total, "check peer consistency")
4273     for dev in instance.disks:
4274       if not dev.iv_name in self.op.disks:
4275         continue
4276       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4277       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4278                                    oth_node==instance.primary_node):
4279         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4280                                  " to replace disks on this node (%s)" %
4281                                  (oth_node, tgt_node))
4282
4283     # Step: create new storage
4284     self.proc.LogStep(3, steps_total, "allocate new storage")
4285     for dev in instance.disks:
4286       if not dev.iv_name in self.op.disks:
4287         continue
4288       size = dev.size
4289       cfg.SetDiskID(dev, tgt_node)
4290       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4291       names = _GenerateUniqueNames(cfg, lv_names)
4292       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4293                              logical_id=(vgname, names[0]))
4294       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4295                              logical_id=(vgname, names[1]))
4296       new_lvs = [lv_data, lv_meta]
4297       old_lvs = dev.children
4298       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4299       info("creating new local storage on %s for %s" %
4300            (tgt_node, dev.iv_name))
4301       # since we *always* want to create this LV, we use the
4302       # _Create...OnPrimary (which forces the creation), even if we
4303       # are talking about the secondary node
4304       for new_lv in new_lvs:
4305         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4306                                         _GetInstanceInfoText(instance)):
4307           raise errors.OpExecError("Failed to create new LV named '%s' on"
4308                                    " node '%s'" %
4309                                    (new_lv.logical_id[1], tgt_node))
4310
4311     # Step: for each lv, detach+rename*2+attach
4312     self.proc.LogStep(4, steps_total, "change drbd configuration")
4313     for dev, old_lvs, new_lvs in iv_names.itervalues():
4314       info("detaching %s drbd from local storage" % dev.iv_name)
4315       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4316         raise errors.OpExecError("Can't detach drbd from local storage on node"
4317                                  " %s for device %s" % (tgt_node, dev.iv_name))
4318       #dev.children = []
4319       #cfg.Update(instance)
4320
4321       # ok, we created the new LVs, so now we know we have the needed
4322       # storage; as such, we proceed on the target node to rename
4323       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4324       # using the assumption that logical_id == physical_id (which in
4325       # turn is the unique_id on that node)
4326
4327       # FIXME(iustin): use a better name for the replaced LVs
4328       temp_suffix = int(time.time())
4329       ren_fn = lambda d, suff: (d.physical_id[0],
4330                                 d.physical_id[1] + "_replaced-%s" % suff)
4331       # build the rename list based on what LVs exist on the node
4332       rlist = []
4333       for to_ren in old_lvs:
4334         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4335         if find_res is not None: # device exists
4336           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4337
4338       info("renaming the old LVs on the target node")
4339       if not rpc.call_blockdev_rename(tgt_node, rlist):
4340         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4341       # now we rename the new LVs to the old LVs
4342       info("renaming the new LVs on the target node")
4343       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4344       if not rpc.call_blockdev_rename(tgt_node, rlist):
4345         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4346
4347       for old, new in zip(old_lvs, new_lvs):
4348         new.logical_id = old.logical_id
4349         cfg.SetDiskID(new, tgt_node)
4350
4351       for disk in old_lvs:
4352         disk.logical_id = ren_fn(disk, temp_suffix)
4353         cfg.SetDiskID(disk, tgt_node)
4354
4355       # now that the new lvs have the old name, we can add them to the device
4356       info("adding new mirror component on %s" % tgt_node)
4357       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4358         for new_lv in new_lvs:
4359           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4360             warning("Can't rollback device %s", hint="manually cleanup unused"
4361                     " logical volumes")
4362         raise errors.OpExecError("Can't add local storage to drbd")
4363
4364       dev.children = new_lvs
4365       cfg.Update(instance)
4366
4367     # Step: wait for sync
4368
4369     # this can fail as the old devices are degraded and _WaitForSync
4370     # does a combined result over all disks, so we don't check its
4371     # return value
4372     self.proc.LogStep(5, steps_total, "sync devices")
4373     _WaitForSync(cfg, instance, self.proc, unlock=True)
4374
4375     # so check manually all the devices
4376     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4377       cfg.SetDiskID(dev, instance.primary_node)
4378       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4379       if is_degr:
4380         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4381
4382     # Step: remove old storage
4383     self.proc.LogStep(6, steps_total, "removing old storage")
4384     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4385       info("remove logical volumes for %s" % name)
4386       for lv in old_lvs:
4387         cfg.SetDiskID(lv, tgt_node)
4388         if not rpc.call_blockdev_remove(tgt_node, lv):
4389           warning("Can't remove old LV", hint="manually remove unused LVs")
4390           continue
4391
4392   def _ExecD8Secondary(self, feedback_fn):
4393     """Replace the secondary node for drbd8.
4394
4395     The algorithm for replace is quite complicated:
4396       - for all disks of the instance:
4397         - create new LVs on the new node with same names
4398         - shutdown the drbd device on the old secondary
4399         - disconnect the drbd network on the primary
4400         - create the drbd device on the new secondary
4401         - network attach the drbd on the primary, using an artifice:
4402           the drbd code for Attach() will connect to the network if it
4403           finds a device which is connected to the good local disks but
4404           not network enabled
4405       - wait for sync across all devices
4406       - remove all disks from the old secondary
4407
4408     Failures are not very well handled.
4409
4410     """
4411     steps_total = 6
4412     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4413     instance = self.instance
4414     iv_names = {}
4415     vgname = self.cfg.GetVGName()
4416     # start of work
4417     cfg = self.cfg
4418     old_node = self.tgt_node
4419     new_node = self.new_node
4420     pri_node = instance.primary_node
4421
4422     # Step: check device activation
4423     self.proc.LogStep(1, steps_total, "check device existence")
4424     info("checking volume groups")
4425     my_vg = cfg.GetVGName()
4426     results = rpc.call_vg_list([pri_node, new_node])
4427     if not results:
4428       raise errors.OpExecError("Can't list volume groups on the nodes")
4429     for node in pri_node, new_node:
4430       res = results.get(node, False)
4431       if not res or my_vg not in res:
4432         raise errors.OpExecError("Volume group '%s' not found on %s" %
4433                                  (my_vg, node))
4434     for dev in instance.disks:
4435       if not dev.iv_name in self.op.disks:
4436         continue
4437       info("checking %s on %s" % (dev.iv_name, pri_node))
4438       cfg.SetDiskID(dev, pri_node)
4439       if not rpc.call_blockdev_find(pri_node, dev):
4440         raise errors.OpExecError("Can't find device %s on node %s" %
4441                                  (dev.iv_name, pri_node))
4442
4443     # Step: check other node consistency
4444     self.proc.LogStep(2, steps_total, "check peer consistency")
4445     for dev in instance.disks:
4446       if not dev.iv_name in self.op.disks:
4447         continue
4448       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4449       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4450         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4451                                  " unsafe to replace the secondary" %
4452                                  pri_node)
4453
4454     # Step: create new storage
4455     self.proc.LogStep(3, steps_total, "allocate new storage")
4456     for dev in instance.disks:
4457       size = dev.size
4458       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4459       # since we *always* want to create this LV, we use the
4460       # _Create...OnPrimary (which forces the creation), even if we
4461       # are talking about the secondary node
4462       for new_lv in dev.children:
4463         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4464                                         _GetInstanceInfoText(instance)):
4465           raise errors.OpExecError("Failed to create new LV named '%s' on"
4466                                    " node '%s'" %
4467                                    (new_lv.logical_id[1], new_node))
4468
4469       iv_names[dev.iv_name] = (dev, dev.children)
4470
4471     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4472     for dev in instance.disks:
4473       size = dev.size
4474       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4475       # create new devices on new_node
4476       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4477                               logical_id=(pri_node, new_node,
4478                                           dev.logical_id[2]),
4479                               children=dev.children)
4480       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4481                                         new_drbd, False,
4482                                       _GetInstanceInfoText(instance)):
4483         raise errors.OpExecError("Failed to create new DRBD on"
4484                                  " node '%s'" % new_node)
4485
4486     for dev in instance.disks:
4487       # we have new devices, shutdown the drbd on the old secondary
4488       info("shutting down drbd for %s on old node" % dev.iv_name)
4489       cfg.SetDiskID(dev, old_node)
4490       if not rpc.call_blockdev_shutdown(old_node, dev):
4491         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4492                 hint="Please cleanup this device manually as soon as possible")
4493
4494     info("detaching primary drbds from the network (=> standalone)")
4495     done = 0
4496     for dev in instance.disks:
4497       cfg.SetDiskID(dev, pri_node)
4498       # set the physical (unique in bdev terms) id to None, meaning
4499       # detach from network
4500       dev.physical_id = (None,) * len(dev.physical_id)
4501       # and 'find' the device, which will 'fix' it to match the
4502       # standalone state
4503       if rpc.call_blockdev_find(pri_node, dev):
4504         done += 1
4505       else:
4506         warning("Failed to detach drbd %s from network, unusual case" %
4507                 dev.iv_name)
4508
4509     if not done:
4510       # no detaches succeeded (very unlikely)
4511       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4512
4513     # if we managed to detach at least one, we update all the disks of
4514     # the instance to point to the new secondary
4515     info("updating instance configuration")
4516     for dev in instance.disks:
4517       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4518       cfg.SetDiskID(dev, pri_node)
4519     cfg.Update(instance)
4520
4521     # and now perform the drbd attach
4522     info("attaching primary drbds to new secondary (standalone => connected)")
4523     failures = []
4524     for dev in instance.disks:
4525       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4526       # since the attach is smart, it's enough to 'find' the device,
4527       # it will automatically activate the network, if the physical_id
4528       # is correct
4529       cfg.SetDiskID(dev, pri_node)
4530       if not rpc.call_blockdev_find(pri_node, dev):
4531         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4532                 "please do a gnt-instance info to see the status of disks")
4533
4534     # this can fail as the old devices are degraded and _WaitForSync
4535     # does a combined result over all disks, so we don't check its
4536     # return value
4537     self.proc.LogStep(5, steps_total, "sync devices")
4538     _WaitForSync(cfg, instance, self.proc, unlock=True)
4539
4540     # so check manually all the devices
4541     for name, (dev, old_lvs) in iv_names.iteritems():
4542       cfg.SetDiskID(dev, pri_node)
4543       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4544       if is_degr:
4545         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4546
4547     self.proc.LogStep(6, steps_total, "removing old storage")
4548     for name, (dev, old_lvs) in iv_names.iteritems():
4549       info("remove logical volumes for %s" % name)
4550       for lv in old_lvs:
4551         cfg.SetDiskID(lv, old_node)
4552         if not rpc.call_blockdev_remove(old_node, lv):
4553           warning("Can't remove LV on old secondary",
4554                   hint="Cleanup stale volumes by hand")
4555
4556   def Exec(self, feedback_fn):
4557     """Execute disk replacement.
4558
4559     This dispatches the disk replacement to the appropriate handler.
4560
4561     """
4562     instance = self.instance
4563
4564     # Activate the instance disks if we're replacing them on a down instance
4565     if instance.status == "down":
4566       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4567       self.proc.ChainOpCode(op)
4568
4569     if instance.disk_template == constants.DT_REMOTE_RAID1:
4570       fn = self._ExecRR1
4571     elif instance.disk_template == constants.DT_DRBD8:
4572       if self.op.remote_node is None:
4573         fn = self._ExecD8DiskOnly
4574       else:
4575         fn = self._ExecD8Secondary
4576     else:
4577       raise errors.ProgrammerError("Unhandled disk replacement case")
4578
4579     ret = fn(feedback_fn)
4580
4581     # Deactivate the instance disks if we're replacing them on a down instance
4582     if instance.status == "down":
4583       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4584       self.proc.ChainOpCode(op)
4585
4586     return ret
4587
4588
4589 class LUGrowDisk(LogicalUnit):
4590   """Grow a disk of an instance.
4591
4592   """
4593   HPATH = "disk-grow"
4594   HTYPE = constants.HTYPE_INSTANCE
4595   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4596
4597   def BuildHooksEnv(self):
4598     """Build hooks env.
4599
4600     This runs on the master, the primary and all the secondaries.
4601
4602     """
4603     env = {
4604       "DISK": self.op.disk,
4605       "AMOUNT": self.op.amount,
4606       }
4607     env.update(_BuildInstanceHookEnvByObject(self.instance))
4608     nl = [
4609       self.sstore.GetMasterNode(),
4610       self.instance.primary_node,
4611       ]
4612     return env, nl, nl
4613
4614   def CheckPrereq(self):
4615     """Check prerequisites.
4616
4617     This checks that the instance is in the cluster.
4618
4619     """
4620     instance = self.cfg.GetInstanceInfo(
4621       self.cfg.ExpandInstanceName(self.op.instance_name))
4622     if instance is None:
4623       raise errors.OpPrereqError("Instance '%s' not known" %
4624                                  self.op.instance_name)
4625
4626     if self.op.amount <= 0:
4627       raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4628
4629     self.instance = instance
4630     self.op.instance_name = instance.name
4631
4632     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4633       raise errors.OpPrereqError("Instance's disk layout does not support"
4634                                  " growing.")
4635
4636     self.disk = instance.FindDisk(self.op.disk)
4637     if self.disk is None:
4638       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4639                                  (self.op.disk, instance.name))
4640
4641     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4642     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4643     for node in nodenames:
4644       info = nodeinfo.get(node, None)
4645       if not info:
4646         raise errors.OpPrereqError("Cannot get current information"
4647                                    " from node '%s'" % node)
4648       vg_free = info.get('vg_free', None)
4649       if not isinstance(vg_free, int):
4650         raise errors.OpPrereqError("Can't compute free disk space on"
4651                                    " node %s" % node)
4652       if self.op.amount > info['vg_free']:
4653         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4654                                    " %d MiB available, %d MiB required" %
4655                                    (node, info['vg_free'], self.op.amount))
4656       is_primary = (node == instance.primary_node)
4657       if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4658         raise errors.OpPrereqError("Disk %s is degraded or not fully"
4659                                  " synchronized on node %s,"
4660                                  " aborting grow." % (self.op.disk, node))
4661
4662   def Exec(self, feedback_fn):
4663     """Execute disk grow.
4664
4665     """
4666     instance = self.instance
4667     disk = self.disk
4668     for node in (instance.secondary_nodes + (instance.primary_node,)):
4669       self.cfg.SetDiskID(disk, node)
4670       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4671       if not result or not isinstance(result, tuple) or len(result) != 2:
4672         raise errors.OpExecError("grow request failed to node %s" % node)
4673       elif not result[0]:
4674         raise errors.OpExecError("grow request failed to node %s: %s" %
4675                                  (node, result[1]))
4676     disk.RecordGrow(self.op.amount)
4677     self.cfg.Update(instance)
4678     if self.op.wait_for_sync:
4679       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4680       if disk_abort:
4681         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4682                      " Please check the instance.")
4683
4684
4685 class LUQueryInstanceData(NoHooksLU):
4686   """Query runtime instance data.
4687
4688   """
4689   _OP_REQP = ["instances"]
4690
4691   def CheckPrereq(self):
4692     """Check prerequisites.
4693
4694     This only checks the optional instance list against the existing names.
4695
4696     """
4697     if not isinstance(self.op.instances, list):
4698       raise errors.OpPrereqError("Invalid argument type 'instances'")
4699     if self.op.instances:
4700       self.wanted_instances = []
4701       names = self.op.instances
4702       for name in names:
4703         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4704         if instance is None:
4705           raise errors.OpPrereqError("No such instance name '%s'" % name)
4706         self.wanted_instances.append(instance)
4707     else:
4708       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4709                                in self.cfg.GetInstanceList()]
4710     return
4711
4712
4713   def _ComputeDiskStatus(self, instance, snode, dev):
4714     """Compute block device status.
4715
4716     """
4717     self.cfg.SetDiskID(dev, instance.primary_node)
4718     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4719     if dev.dev_type in constants.LDS_DRBD:
4720       # we change the snode then (otherwise we use the one passed in)
4721       if dev.logical_id[0] == instance.primary_node:
4722         snode = dev.logical_id[1]
4723       else:
4724         snode = dev.logical_id[0]
4725
4726     if snode:
4727       self.cfg.SetDiskID(dev, snode)
4728       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4729     else:
4730       dev_sstatus = None
4731
4732     if dev.children:
4733       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4734                       for child in dev.children]
4735     else:
4736       dev_children = []
4737
4738     data = {
4739       "iv_name": dev.iv_name,
4740       "dev_type": dev.dev_type,
4741       "logical_id": dev.logical_id,
4742       "physical_id": dev.physical_id,
4743       "pstatus": dev_pstatus,
4744       "sstatus": dev_sstatus,
4745       "children": dev_children,
4746       }
4747
4748     return data
4749
4750   def Exec(self, feedback_fn):
4751     """Gather and return data"""
4752     result = {}
4753     for instance in self.wanted_instances:
4754       remote_info = rpc.call_instance_info(instance.primary_node,
4755                                                 instance.name)
4756       if remote_info and "state" in remote_info:
4757         remote_state = "up"
4758       else:
4759         remote_state = "down"
4760       if instance.status == "down":
4761         config_state = "down"
4762       else:
4763         config_state = "up"
4764
4765       disks = [self._ComputeDiskStatus(instance, None, device)
4766                for device in instance.disks]
4767
4768       idict = {
4769         "name": instance.name,
4770         "config_state": config_state,
4771         "run_state": remote_state,
4772         "pnode": instance.primary_node,
4773         "snodes": instance.secondary_nodes,
4774         "os": instance.os,
4775         "memory": instance.memory,
4776         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4777         "disks": disks,
4778         "vcpus": instance.vcpus,
4779         }
4780
4781       htkind = self.sstore.GetHypervisorType()
4782       if htkind == constants.HT_XEN_PVM30:
4783         idict["kernel_path"] = instance.kernel_path
4784         idict["initrd_path"] = instance.initrd_path
4785
4786       if htkind == constants.HT_XEN_HVM31:
4787         idict["hvm_boot_order"] = instance.hvm_boot_order
4788         idict["hvm_acpi"] = instance.hvm_acpi
4789         idict["hvm_pae"] = instance.hvm_pae
4790         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4791
4792       if htkind in constants.HTS_REQ_PORT:
4793         idict["vnc_bind_address"] = instance.vnc_bind_address
4794         idict["network_port"] = instance.network_port
4795
4796       result[instance.name] = idict
4797
4798     return result
4799
4800
4801 class LUSetInstanceParms(LogicalUnit):
4802   """Modifies an instances's parameters.
4803
4804   """
4805   HPATH = "instance-modify"
4806   HTYPE = constants.HTYPE_INSTANCE
4807   _OP_REQP = ["instance_name"]
4808
4809   def BuildHooksEnv(self):
4810     """Build hooks env.
4811
4812     This runs on the master, primary and secondaries.
4813
4814     """
4815     args = dict()
4816     if self.mem:
4817       args['memory'] = self.mem
4818     if self.vcpus:
4819       args['vcpus'] = self.vcpus
4820     if self.do_ip or self.do_bridge or self.mac:
4821       if self.do_ip:
4822         ip = self.ip
4823       else:
4824         ip = self.instance.nics[0].ip
4825       if self.bridge:
4826         bridge = self.bridge
4827       else:
4828         bridge = self.instance.nics[0].bridge
4829       if self.mac:
4830         mac = self.mac
4831       else:
4832         mac = self.instance.nics[0].mac
4833       args['nics'] = [(ip, bridge, mac)]
4834     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4835     nl = [self.sstore.GetMasterNode(),
4836           self.instance.primary_node] + list(self.instance.secondary_nodes)
4837     return env, nl, nl
4838
4839   def CheckPrereq(self):
4840     """Check prerequisites.
4841
4842     This only checks the instance list against the existing names.
4843
4844     """
4845     self.mem = getattr(self.op, "mem", None)
4846     self.vcpus = getattr(self.op, "vcpus", None)
4847     self.ip = getattr(self.op, "ip", None)
4848     self.mac = getattr(self.op, "mac", None)
4849     self.bridge = getattr(self.op, "bridge", None)
4850     self.kernel_path = getattr(self.op, "kernel_path", None)
4851     self.initrd_path = getattr(self.op, "initrd_path", None)
4852     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4853     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4854     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4855     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4856     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4857     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4858                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4859                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4860                  self.vnc_bind_address]
4861     if all_parms.count(None) == len(all_parms):
4862       raise errors.OpPrereqError("No changes submitted")
4863     if self.mem is not None:
4864       try:
4865         self.mem = int(self.mem)
4866       except ValueError, err:
4867         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4868     if self.vcpus is not None:
4869       try:
4870         self.vcpus = int(self.vcpus)
4871       except ValueError, err:
4872         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4873     if self.ip is not None:
4874       self.do_ip = True
4875       if self.ip.lower() == "none":
4876         self.ip = None
4877       else:
4878         if not utils.IsValidIP(self.ip):
4879           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4880     else:
4881       self.do_ip = False
4882     self.do_bridge = (self.bridge is not None)
4883     if self.mac is not None:
4884       if self.cfg.IsMacInUse(self.mac):
4885         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4886                                    self.mac)
4887       if not utils.IsValidMac(self.mac):
4888         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4889
4890     if self.kernel_path is not None:
4891       self.do_kernel_path = True
4892       if self.kernel_path == constants.VALUE_NONE:
4893         raise errors.OpPrereqError("Can't set instance to no kernel")
4894
4895       if self.kernel_path != constants.VALUE_DEFAULT:
4896         if not os.path.isabs(self.kernel_path):
4897           raise errors.OpPrereqError("The kernel path must be an absolute"
4898                                     " filename")
4899     else:
4900       self.do_kernel_path = False
4901
4902     if self.initrd_path is not None:
4903       self.do_initrd_path = True
4904       if self.initrd_path not in (constants.VALUE_NONE,
4905                                   constants.VALUE_DEFAULT):
4906         if not os.path.isabs(self.initrd_path):
4907           raise errors.OpPrereqError("The initrd path must be an absolute"
4908                                     " filename")
4909     else:
4910       self.do_initrd_path = False
4911
4912     # boot order verification
4913     if self.hvm_boot_order is not None:
4914       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4915         if len(self.hvm_boot_order.strip("acdn")) != 0:
4916           raise errors.OpPrereqError("invalid boot order specified,"
4917                                      " must be one or more of [acdn]"
4918                                      " or 'default'")
4919
4920     # hvm_cdrom_image_path verification
4921     if self.op.hvm_cdrom_image_path is not None:
4922       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4923               self.op.hvm_cdrom_image_path.lower() == "none"):
4924         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4925                                    " be an absolute path or None, not %s" %
4926                                    self.op.hvm_cdrom_image_path)
4927       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4928               self.op.hvm_cdrom_image_path.lower() == "none"):
4929         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4930                                    " regular file or a symlink pointing to"
4931                                    " an existing regular file, not %s" %
4932                                    self.op.hvm_cdrom_image_path)
4933
4934     # vnc_bind_address verification
4935     if self.op.vnc_bind_address is not None:
4936       if not utils.IsValidIP(self.op.vnc_bind_address):
4937         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4938                                    " like a valid IP address" %
4939                                    self.op.vnc_bind_address)
4940
4941     instance = self.cfg.GetInstanceInfo(
4942       self.cfg.ExpandInstanceName(self.op.instance_name))
4943     if instance is None:
4944       raise errors.OpPrereqError("No such instance name '%s'" %
4945                                  self.op.instance_name)
4946     self.op.instance_name = instance.name
4947     self.instance = instance
4948     return
4949
4950   def Exec(self, feedback_fn):
4951     """Modifies an instance.
4952
4953     All parameters take effect only at the next restart of the instance.
4954     """
4955     result = []
4956     instance = self.instance
4957     if self.mem:
4958       instance.memory = self.mem
4959       result.append(("mem", self.mem))
4960     if self.vcpus:
4961       instance.vcpus = self.vcpus
4962       result.append(("vcpus",  self.vcpus))
4963     if self.do_ip:
4964       instance.nics[0].ip = self.ip
4965       result.append(("ip", self.ip))
4966     if self.bridge:
4967       instance.nics[0].bridge = self.bridge
4968       result.append(("bridge", self.bridge))
4969     if self.mac:
4970       instance.nics[0].mac = self.mac
4971       result.append(("mac", self.mac))
4972     if self.do_kernel_path:
4973       instance.kernel_path = self.kernel_path
4974       result.append(("kernel_path", self.kernel_path))
4975     if self.do_initrd_path:
4976       instance.initrd_path = self.initrd_path
4977       result.append(("initrd_path", self.initrd_path))
4978     if self.hvm_boot_order:
4979       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4980         instance.hvm_boot_order = None
4981       else:
4982         instance.hvm_boot_order = self.hvm_boot_order
4983       result.append(("hvm_boot_order", self.hvm_boot_order))
4984     if self.hvm_acpi is not None:
4985       instance.hvm_acpi = self.hvm_acpi
4986       result.append(("hvm_acpi", self.hvm_acpi))
4987     if self.hvm_pae is not None:
4988       instance.hvm_pae = self.hvm_pae
4989       result.append(("hvm_pae", self.hvm_pae))
4990     if self.hvm_cdrom_image_path:
4991       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4992         instance.hvm_cdrom_image_path = None
4993       else:
4994         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4995       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4996     if self.vnc_bind_address:
4997       instance.vnc_bind_address = self.vnc_bind_address
4998       result.append(("vnc_bind_address", self.vnc_bind_address))
4999
5000     self.cfg.AddInstance(instance)
5001
5002     return result
5003
5004
5005 class LUQueryExports(NoHooksLU):
5006   """Query the exports list
5007
5008   """
5009   _OP_REQP = []
5010
5011   def CheckPrereq(self):
5012     """Check that the nodelist contains only existing nodes.
5013
5014     """
5015     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5016
5017   def Exec(self, feedback_fn):
5018     """Compute the list of all the exported system images.
5019
5020     Returns:
5021       a dictionary with the structure node->(export-list)
5022       where export-list is a list of the instances exported on
5023       that node.
5024
5025     """
5026     return rpc.call_export_list(self.nodes)
5027
5028
5029 class LUExportInstance(LogicalUnit):
5030   """Export an instance to an image in the cluster.
5031
5032   """
5033   HPATH = "instance-export"
5034   HTYPE = constants.HTYPE_INSTANCE
5035   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5036
5037   def BuildHooksEnv(self):
5038     """Build hooks env.
5039
5040     This will run on the master, primary node and target node.
5041
5042     """
5043     env = {
5044       "EXPORT_NODE": self.op.target_node,
5045       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5046       }
5047     env.update(_BuildInstanceHookEnvByObject(self.instance))
5048     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5049           self.op.target_node]
5050     return env, nl, nl
5051
5052   def CheckPrereq(self):
5053     """Check prerequisites.
5054
5055     This checks that the instance and node names are valid.
5056
5057     """
5058     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5059     self.instance = self.cfg.GetInstanceInfo(instance_name)
5060     if self.instance is None:
5061       raise errors.OpPrereqError("Instance '%s' not found" %
5062                                  self.op.instance_name)
5063
5064     # node verification
5065     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5066     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5067
5068     if self.dst_node is None:
5069       raise errors.OpPrereqError("Destination node '%s' is unknown." %
5070                                  self.op.target_node)
5071     self.op.target_node = self.dst_node.name
5072
5073   def Exec(self, feedback_fn):
5074     """Export an instance to an image in the cluster.
5075
5076     """
5077     instance = self.instance
5078     dst_node = self.dst_node
5079     src_node = instance.primary_node
5080     if self.op.shutdown:
5081       # shutdown the instance, but not the disks
5082       if not rpc.call_instance_shutdown(src_node, instance):
5083         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5084                                  (instance.name, src_node))
5085
5086     vgname = self.cfg.GetVGName()
5087
5088     snap_disks = []
5089
5090     try:
5091       for disk in instance.disks:
5092         if disk.iv_name == "sda":
5093           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5094           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5095
5096           if not new_dev_name:
5097             logger.Error("could not snapshot block device %s on node %s" %
5098                          (disk.logical_id[1], src_node))
5099           else:
5100             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5101                                       logical_id=(vgname, new_dev_name),
5102                                       physical_id=(vgname, new_dev_name),
5103                                       iv_name=disk.iv_name)
5104             snap_disks.append(new_dev)
5105
5106     finally:
5107       if self.op.shutdown and instance.status == "up":
5108         if not rpc.call_instance_start(src_node, instance, None):
5109           _ShutdownInstanceDisks(instance, self.cfg)
5110           raise errors.OpExecError("Could not start instance")
5111
5112     # TODO: check for size
5113
5114     for dev in snap_disks:
5115       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5116                                            instance):
5117         logger.Error("could not export block device %s from node"
5118                      " %s to node %s" %
5119                      (dev.logical_id[1], src_node, dst_node.name))
5120       if not rpc.call_blockdev_remove(src_node, dev):
5121         logger.Error("could not remove snapshot block device %s from"
5122                      " node %s" % (dev.logical_id[1], src_node))
5123
5124     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5125       logger.Error("could not finalize export for instance %s on node %s" %
5126                    (instance.name, dst_node.name))
5127
5128     nodelist = self.cfg.GetNodeList()
5129     nodelist.remove(dst_node.name)
5130
5131     # on one-node clusters nodelist will be empty after the removal
5132     # if we proceed the backup would be removed because OpQueryExports
5133     # substitutes an empty list with the full cluster node list.
5134     if nodelist:
5135       op = opcodes.OpQueryExports(nodes=nodelist)
5136       exportlist = self.proc.ChainOpCode(op)
5137       for node in exportlist:
5138         if instance.name in exportlist[node]:
5139           if not rpc.call_export_remove(node, instance.name):
5140             logger.Error("could not remove older export for instance %s"
5141                          " on node %s" % (instance.name, node))
5142
5143
5144 class LURemoveExport(NoHooksLU):
5145   """Remove exports related to the named instance.
5146
5147   """
5148   _OP_REQP = ["instance_name"]
5149
5150   def CheckPrereq(self):
5151     """Check prerequisites.
5152     """
5153     pass
5154
5155   def Exec(self, feedback_fn):
5156     """Remove any export.
5157
5158     """
5159     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5160     # If the instance was not found we'll try with the name that was passed in.
5161     # This will only work if it was an FQDN, though.
5162     fqdn_warn = False
5163     if not instance_name:
5164       fqdn_warn = True
5165       instance_name = self.op.instance_name
5166
5167     op = opcodes.OpQueryExports(nodes=[])
5168     exportlist = self.proc.ChainOpCode(op)
5169     found = False
5170     for node in exportlist:
5171       if instance_name in exportlist[node]:
5172         found = True
5173         if not rpc.call_export_remove(node, instance_name):
5174           logger.Error("could not remove export for instance %s"
5175                        " on node %s" % (instance_name, node))
5176
5177     if fqdn_warn and not found:
5178       feedback_fn("Export not found. If trying to remove an export belonging"
5179                   " to a deleted instance please use its Fully Qualified"
5180                   " Domain Name.")
5181
5182
5183 class TagsLU(NoHooksLU):
5184   """Generic tags LU.
5185
5186   This is an abstract class which is the parent of all the other tags LUs.
5187
5188   """
5189   def CheckPrereq(self):
5190     """Check prerequisites.
5191
5192     """
5193     if self.op.kind == constants.TAG_CLUSTER:
5194       self.target = self.cfg.GetClusterInfo()
5195     elif self.op.kind == constants.TAG_NODE:
5196       name = self.cfg.ExpandNodeName(self.op.name)
5197       if name is None:
5198         raise errors.OpPrereqError("Invalid node name (%s)" %
5199                                    (self.op.name,))
5200       self.op.name = name
5201       self.target = self.cfg.GetNodeInfo(name)
5202     elif self.op.kind == constants.TAG_INSTANCE:
5203       name = self.cfg.ExpandInstanceName(self.op.name)
5204       if name is None:
5205         raise errors.OpPrereqError("Invalid instance name (%s)" %
5206                                    (self.op.name,))
5207       self.op.name = name
5208       self.target = self.cfg.GetInstanceInfo(name)
5209     else:
5210       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5211                                  str(self.op.kind))
5212
5213
5214 class LUGetTags(TagsLU):
5215   """Returns the tags of a given object.
5216
5217   """
5218   _OP_REQP = ["kind", "name"]
5219
5220   def Exec(self, feedback_fn):
5221     """Returns the tag list.
5222
5223     """
5224     return self.target.GetTags()
5225
5226
5227 class LUSearchTags(NoHooksLU):
5228   """Searches the tags for a given pattern.
5229
5230   """
5231   _OP_REQP = ["pattern"]
5232
5233   def CheckPrereq(self):
5234     """Check prerequisites.
5235
5236     This checks the pattern passed for validity by compiling it.
5237
5238     """
5239     try:
5240       self.re = re.compile(self.op.pattern)
5241     except re.error, err:
5242       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5243                                  (self.op.pattern, err))
5244
5245   def Exec(self, feedback_fn):
5246     """Returns the tag list.
5247
5248     """
5249     cfg = self.cfg
5250     tgts = [("/cluster", cfg.GetClusterInfo())]
5251     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5252     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5253     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5254     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5255     results = []
5256     for path, target in tgts:
5257       for tag in target.GetTags():
5258         if self.re.search(tag):
5259           results.append((path, tag))
5260     return results
5261
5262
5263 class LUAddTags(TagsLU):
5264   """Sets a tag on a given object.
5265
5266   """
5267   _OP_REQP = ["kind", "name", "tags"]
5268
5269   def CheckPrereq(self):
5270     """Check prerequisites.
5271
5272     This checks the type and length of the tag name and value.
5273
5274     """
5275     TagsLU.CheckPrereq(self)
5276     for tag in self.op.tags:
5277       objects.TaggableObject.ValidateTag(tag)
5278
5279   def Exec(self, feedback_fn):
5280     """Sets the tag.
5281
5282     """
5283     try:
5284       for tag in self.op.tags:
5285         self.target.AddTag(tag)
5286     except errors.TagError, err:
5287       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5288     try:
5289       self.cfg.Update(self.target)
5290     except errors.ConfigurationError:
5291       raise errors.OpRetryError("There has been a modification to the"
5292                                 " config file and the operation has been"
5293                                 " aborted. Please retry.")
5294
5295
5296 class LUDelTags(TagsLU):
5297   """Delete a list of tags from a given object.
5298
5299   """
5300   _OP_REQP = ["kind", "name", "tags"]
5301
5302   def CheckPrereq(self):
5303     """Check prerequisites.
5304
5305     This checks that we have the given tag.
5306
5307     """
5308     TagsLU.CheckPrereq(self)
5309     for tag in self.op.tags:
5310       objects.TaggableObject.ValidateTag(tag, removal=True)
5311     del_tags = frozenset(self.op.tags)
5312     cur_tags = self.target.GetTags()
5313     if not del_tags <= cur_tags:
5314       diff_tags = del_tags - cur_tags
5315       diff_names = ["'%s'" % tag for tag in diff_tags]
5316       diff_names.sort()
5317       raise errors.OpPrereqError("Tag(s) %s not found" %
5318                                  (",".join(diff_names)))
5319
5320   def Exec(self, feedback_fn):
5321     """Remove the tag from the object.
5322
5323     """
5324     for tag in self.op.tags:
5325       self.target.RemoveTag(tag)
5326     try:
5327       self.cfg.Update(self.target)
5328     except errors.ConfigurationError:
5329       raise errors.OpRetryError("There has been a modification to the"
5330                                 " config file and the operation has been"
5331                                 " aborted. Please retry.")
5332
5333 class LUTestDelay(NoHooksLU):
5334   """Sleep for a specified amount of time.
5335
5336   This LU sleeps on the master and/or nodes for a specified amoutn of
5337   time.
5338
5339   """
5340   _OP_REQP = ["duration", "on_master", "on_nodes"]
5341
5342   def CheckPrereq(self):
5343     """Check prerequisites.
5344
5345     This checks that we have a good list of nodes and/or the duration
5346     is valid.
5347
5348     """
5349
5350     if self.op.on_nodes:
5351       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5352
5353   def Exec(self, feedback_fn):
5354     """Do the actual sleep.
5355
5356     """
5357     if self.op.on_master:
5358       if not utils.TestDelay(self.op.duration):
5359         raise errors.OpExecError("Error during master delay test")
5360     if self.op.on_nodes:
5361       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5362       if not result:
5363         raise errors.OpExecError("Complete failure from rpc call")
5364       for node, node_result in result.items():
5365         if not node_result:
5366           raise errors.OpExecError("Failure during rpc call to node %s,"
5367                                    " result: %s" % (node, node_result))
5368
5369
5370 class IAllocator(object):
5371   """IAllocator framework.
5372
5373   An IAllocator instance has three sets of attributes:
5374     - cfg/sstore that are needed to query the cluster
5375     - input data (all members of the _KEYS class attribute are required)
5376     - four buffer attributes (in|out_data|text), that represent the
5377       input (to the external script) in text and data structure format,
5378       and the output from it, again in two formats
5379     - the result variables from the script (success, info, nodes) for
5380       easy usage
5381
5382   """
5383   _ALLO_KEYS = [
5384     "mem_size", "disks", "disk_template",
5385     "os", "tags", "nics", "vcpus",
5386     ]
5387   _RELO_KEYS = [
5388     "relocate_from",
5389     ]
5390
5391   def __init__(self, cfg, sstore, mode, name, **kwargs):
5392     self.cfg = cfg
5393     self.sstore = sstore
5394     # init buffer variables
5395     self.in_text = self.out_text = self.in_data = self.out_data = None
5396     # init all input fields so that pylint is happy
5397     self.mode = mode
5398     self.name = name
5399     self.mem_size = self.disks = self.disk_template = None
5400     self.os = self.tags = self.nics = self.vcpus = None
5401     self.relocate_from = None
5402     # computed fields
5403     self.required_nodes = None
5404     # init result fields
5405     self.success = self.info = self.nodes = None
5406     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5407       keyset = self._ALLO_KEYS
5408     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5409       keyset = self._RELO_KEYS
5410     else:
5411       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5412                                    " IAllocator" % self.mode)
5413     for key in kwargs:
5414       if key not in keyset:
5415         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5416                                      " IAllocator" % key)
5417       setattr(self, key, kwargs[key])
5418     for key in keyset:
5419       if key not in kwargs:
5420         raise errors.ProgrammerError("Missing input parameter '%s' to"
5421                                      " IAllocator" % key)
5422     self._BuildInputData()
5423
5424   def _ComputeClusterData(self):
5425     """Compute the generic allocator input data.
5426
5427     This is the data that is independent of the actual operation.
5428
5429     """
5430     cfg = self.cfg
5431     # cluster data
5432     data = {
5433       "version": 1,
5434       "cluster_name": self.sstore.GetClusterName(),
5435       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5436       "hypervisor_type": self.sstore.GetHypervisorType(),
5437       # we don't have job IDs
5438       }
5439
5440     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5441
5442     # node data
5443     node_results = {}
5444     node_list = cfg.GetNodeList()
5445     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5446     for nname in node_list:
5447       ninfo = cfg.GetNodeInfo(nname)
5448       if nname not in node_data or not isinstance(node_data[nname], dict):
5449         raise errors.OpExecError("Can't get data for node %s" % nname)
5450       remote_info = node_data[nname]
5451       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5452                    'vg_size', 'vg_free', 'cpu_total']:
5453         if attr not in remote_info:
5454           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5455                                    (nname, attr))
5456         try:
5457           remote_info[attr] = int(remote_info[attr])
5458         except ValueError, err:
5459           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5460                                    " %s" % (nname, attr, str(err)))
5461       # compute memory used by primary instances
5462       i_p_mem = i_p_up_mem = 0
5463       for iinfo in i_list:
5464         if iinfo.primary_node == nname:
5465           i_p_mem += iinfo.memory
5466           if iinfo.status == "up":
5467             i_p_up_mem += iinfo.memory
5468
5469       # compute memory used by instances
5470       pnr = {
5471         "tags": list(ninfo.GetTags()),
5472         "total_memory": remote_info['memory_total'],
5473         "reserved_memory": remote_info['memory_dom0'],
5474         "free_memory": remote_info['memory_free'],
5475         "i_pri_memory": i_p_mem,
5476         "i_pri_up_memory": i_p_up_mem,
5477         "total_disk": remote_info['vg_size'],
5478         "free_disk": remote_info['vg_free'],
5479         "primary_ip": ninfo.primary_ip,
5480         "secondary_ip": ninfo.secondary_ip,
5481         "total_cpus": remote_info['cpu_total'],
5482         }
5483       node_results[nname] = pnr
5484     data["nodes"] = node_results
5485
5486     # instance data
5487     instance_data = {}
5488     for iinfo in i_list:
5489       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5490                   for n in iinfo.nics]
5491       pir = {
5492         "tags": list(iinfo.GetTags()),
5493         "should_run": iinfo.status == "up",
5494         "vcpus": iinfo.vcpus,
5495         "memory": iinfo.memory,
5496         "os": iinfo.os,
5497         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5498         "nics": nic_data,
5499         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5500         "disk_template": iinfo.disk_template,
5501         }
5502       instance_data[iinfo.name] = pir
5503
5504     data["instances"] = instance_data
5505
5506     self.in_data = data
5507
5508   def _AddNewInstance(self):
5509     """Add new instance data to allocator structure.
5510
5511     This in combination with _AllocatorGetClusterData will create the
5512     correct structure needed as input for the allocator.
5513
5514     The checks for the completeness of the opcode must have already been
5515     done.
5516
5517     """
5518     data = self.in_data
5519     if len(self.disks) != 2:
5520       raise errors.OpExecError("Only two-disk configurations supported")
5521
5522     disk_space = _ComputeDiskSize(self.disk_template,
5523                                   self.disks[0]["size"], self.disks[1]["size"])
5524
5525     if self.disk_template in constants.DTS_NET_MIRROR:
5526       self.required_nodes = 2
5527     else:
5528       self.required_nodes = 1
5529     request = {
5530       "type": "allocate",
5531       "name": self.name,
5532       "disk_template": self.disk_template,
5533       "tags": self.tags,
5534       "os": self.os,
5535       "vcpus": self.vcpus,
5536       "memory": self.mem_size,
5537       "disks": self.disks,
5538       "disk_space_total": disk_space,
5539       "nics": self.nics,
5540       "required_nodes": self.required_nodes,
5541       }
5542     data["request"] = request
5543
5544   def _AddRelocateInstance(self):
5545     """Add relocate instance data to allocator structure.
5546
5547     This in combination with _IAllocatorGetClusterData will create the
5548     correct structure needed as input for the allocator.
5549
5550     The checks for the completeness of the opcode must have already been
5551     done.
5552
5553     """
5554     instance = self.cfg.GetInstanceInfo(self.name)
5555     if instance is None:
5556       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5557                                    " IAllocator" % self.name)
5558
5559     if instance.disk_template not in constants.DTS_NET_MIRROR:
5560       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5561
5562     if len(instance.secondary_nodes) != 1:
5563       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5564
5565     self.required_nodes = 1
5566
5567     disk_space = _ComputeDiskSize(instance.disk_template,
5568                                   instance.disks[0].size,
5569                                   instance.disks[1].size)
5570
5571     request = {
5572       "type": "relocate",
5573       "name": self.name,
5574       "disk_space_total": disk_space,
5575       "required_nodes": self.required_nodes,
5576       "relocate_from": self.relocate_from,
5577       }
5578     self.in_data["request"] = request
5579
5580   def _BuildInputData(self):
5581     """Build input data structures.
5582
5583     """
5584     self._ComputeClusterData()
5585
5586     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5587       self._AddNewInstance()
5588     else:
5589       self._AddRelocateInstance()
5590
5591     self.in_text = serializer.Dump(self.in_data)
5592
5593   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5594     """Run an instance allocator and return the results.
5595
5596     """
5597     data = self.in_text
5598
5599     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5600
5601     if not isinstance(result, tuple) or len(result) != 4:
5602       raise errors.OpExecError("Invalid result from master iallocator runner")
5603
5604     rcode, stdout, stderr, fail = result
5605
5606     if rcode == constants.IARUN_NOTFOUND:
5607       raise errors.OpExecError("Can't find allocator '%s'" % name)
5608     elif rcode == constants.IARUN_FAILURE:
5609         raise errors.OpExecError("Instance allocator call failed: %s,"
5610                                  " output: %s" %
5611                                  (fail, stdout+stderr))
5612     self.out_text = stdout
5613     if validate:
5614       self._ValidateResult()
5615
5616   def _ValidateResult(self):
5617     """Process the allocator results.
5618
5619     This will process and if successful save the result in
5620     self.out_data and the other parameters.
5621
5622     """
5623     try:
5624       rdict = serializer.Load(self.out_text)
5625     except Exception, err:
5626       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5627
5628     if not isinstance(rdict, dict):
5629       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5630
5631     for key in "success", "info", "nodes":
5632       if key not in rdict:
5633         raise errors.OpExecError("Can't parse iallocator results:"
5634                                  " missing key '%s'" % key)
5635       setattr(self, key, rdict[key])
5636
5637     if not isinstance(rdict["nodes"], list):
5638       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5639                                " is not a list")
5640     self.out_data = rdict
5641
5642
5643 class LUTestAllocator(NoHooksLU):
5644   """Run allocator tests.
5645
5646   This LU runs the allocator tests
5647
5648   """
5649   _OP_REQP = ["direction", "mode", "name"]
5650
5651   def CheckPrereq(self):
5652     """Check prerequisites.
5653
5654     This checks the opcode parameters depending on the director and mode test.
5655
5656     """
5657     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5658       for attr in ["name", "mem_size", "disks", "disk_template",
5659                    "os", "tags", "nics", "vcpus"]:
5660         if not hasattr(self.op, attr):
5661           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5662                                      attr)
5663       iname = self.cfg.ExpandInstanceName(self.op.name)
5664       if iname is not None:
5665         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5666                                    iname)
5667       if not isinstance(self.op.nics, list):
5668         raise errors.OpPrereqError("Invalid parameter 'nics'")
5669       for row in self.op.nics:
5670         if (not isinstance(row, dict) or
5671             "mac" not in row or
5672             "ip" not in row or
5673             "bridge" not in row):
5674           raise errors.OpPrereqError("Invalid contents of the"
5675                                      " 'nics' parameter")
5676       if not isinstance(self.op.disks, list):
5677         raise errors.OpPrereqError("Invalid parameter 'disks'")
5678       if len(self.op.disks) != 2:
5679         raise errors.OpPrereqError("Only two-disk configurations supported")
5680       for row in self.op.disks:
5681         if (not isinstance(row, dict) or
5682             "size" not in row or
5683             not isinstance(row["size"], int) or
5684             "mode" not in row or
5685             row["mode"] not in ['r', 'w']):
5686           raise errors.OpPrereqError("Invalid contents of the"
5687                                      " 'disks' parameter")
5688     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5689       if not hasattr(self.op, "name"):
5690         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5691       fname = self.cfg.ExpandInstanceName(self.op.name)
5692       if fname is None:
5693         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5694                                    self.op.name)
5695       self.op.name = fname
5696       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5697     else:
5698       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5699                                  self.op.mode)
5700
5701     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5702       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5703         raise errors.OpPrereqError("Missing allocator name")
5704     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5705       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5706                                  self.op.direction)
5707
5708   def Exec(self, feedback_fn):
5709     """Run the allocator test.
5710
5711     """
5712     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5713       ial = IAllocator(self.cfg, self.sstore,
5714                        mode=self.op.mode,
5715                        name=self.op.name,
5716                        mem_size=self.op.mem_size,
5717                        disks=self.op.disks,
5718                        disk_template=self.op.disk_template,
5719                        os=self.op.os,
5720                        tags=self.op.tags,
5721                        nics=self.op.nics,
5722                        vcpus=self.op.vcpus,
5723                        )
5724     else:
5725       ial = IAllocator(self.cfg, self.sstore,
5726                        mode=self.op.mode,
5727                        name=self.op.name,
5728                        relocate_from=list(self.relocate_from),
5729                        )
5730
5731     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5732       result = ial.in_text
5733     else:
5734       ial.Run(self.op.allocator, validate=False)
5735       result = ial.out_text
5736     return result