Change the order of config updates in some LUs
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.proc = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     self.__ssh = None
78
79     for attr_name in self._OP_REQP:
80       attr_val = getattr(op, attr_name, None)
81       if attr_val is None:
82         raise errors.OpPrereqError("Required parameter '%s' missing" %
83                                    attr_name)
84     if self.REQ_CLUSTER:
85       if not cfg.IsCluster():
86         raise errors.OpPrereqError("Cluster not initialized yet,"
87                                    " use 'gnt-cluster init' first.")
88       if self.REQ_MASTER:
89         master = sstore.GetMasterNode()
90         if master != utils.HostInfo().name:
91           raise errors.OpPrereqError("Commands must be run on the master"
92                                      " node %s" % master)
93
94   def __GetSSH(self):
95     """Returns the SshRunner object
96
97     """
98     if not self.__ssh:
99       self.__ssh = ssh.SshRunner(self.sstore)
100     return self.__ssh
101
102   ssh = property(fget=__GetSSH)
103
104   def CheckPrereq(self):
105     """Check prerequisites for this LU.
106
107     This method should check that the prerequisites for the execution
108     of this LU are fulfilled. It can do internode communication, but
109     it should be idempotent - no cluster or system changes are
110     allowed.
111
112     The method should raise errors.OpPrereqError in case something is
113     not fulfilled. Its return value is ignored.
114
115     This method should also update all the parameters of the opcode to
116     their canonical form; e.g. a short node name must be fully
117     expanded after this method has successfully completed (so that
118     hooks, logging, etc. work correctly).
119
120     """
121     raise NotImplementedError
122
123   def Exec(self, feedback_fn):
124     """Execute the LU.
125
126     This method should implement the actual work. It should raise
127     errors.OpExecError for failures that are somewhat dealt with in
128     code, or expected.
129
130     """
131     raise NotImplementedError
132
133   def BuildHooksEnv(self):
134     """Build hooks environment for this LU.
135
136     This method should return a three-node tuple consisting of: a dict
137     containing the environment that will be used for running the
138     specific hook for this LU, a list of node names on which the hook
139     should run before the execution, and a list of node names on which
140     the hook should run after the execution.
141
142     The keys of the dict must not have 'GANETI_' prefixed as this will
143     be handled in the hooks runner. Also note additional keys will be
144     added by the hooks runner. If the LU doesn't define any
145     environment, an empty dict (and not None) should be returned.
146
147     As for the node lists, the master should not be included in the
148     them, as it will be added by the hooks runner in case this LU
149     requires a cluster to run on (otherwise we don't have a node
150     list). No nodes should be returned as an empty list (and not
151     None).
152
153     Note that if the HPATH for a LU class is None, this function will
154     not be called.
155
156     """
157     raise NotImplementedError
158
159
160 class NoHooksLU(LogicalUnit):
161   """Simple LU which runs no hooks.
162
163   This LU is intended as a parent for other LogicalUnits which will
164   run no hooks, in order to reduce duplicate code.
165
166   """
167   HPATH = None
168   HTYPE = None
169
170   def BuildHooksEnv(self):
171     """Build hooks env.
172
173     This is a no-op, since we don't run hooks.
174
175     """
176     return {}, [], []
177
178
179 def _AddHostToEtcHosts(hostname):
180   """Wrapper around utils.SetEtcHostsEntry.
181
182   """
183   hi = utils.HostInfo(name=hostname)
184   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185
186
187 def _RemoveHostFromEtcHosts(hostname):
188   """Wrapper around utils.RemoveEtcHostsEntry.
189
190   """
191   hi = utils.HostInfo(name=hostname)
192   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194
195
196 def _GetWantedNodes(lu, nodes):
197   """Returns list of checked and expanded node names.
198
199   Args:
200     nodes: List of nodes (strings) or None for all
201
202   """
203   if not isinstance(nodes, list):
204     raise errors.OpPrereqError("Invalid argument type 'nodes'")
205
206   if nodes:
207     wanted = []
208
209     for name in nodes:
210       node = lu.cfg.ExpandNodeName(name)
211       if node is None:
212         raise errors.OpPrereqError("No such node name '%s'" % name)
213       wanted.append(node)
214
215   else:
216     wanted = lu.cfg.GetNodeList()
217   return utils.NiceSort(wanted)
218
219
220 def _GetWantedInstances(lu, instances):
221   """Returns list of checked and expanded instance names.
222
223   Args:
224     instances: List of instances (strings) or None for all
225
226   """
227   if not isinstance(instances, list):
228     raise errors.OpPrereqError("Invalid argument type 'instances'")
229
230   if instances:
231     wanted = []
232
233     for name in instances:
234       instance = lu.cfg.ExpandInstanceName(name)
235       if instance is None:
236         raise errors.OpPrereqError("No such instance name '%s'" % name)
237       wanted.append(instance)
238
239   else:
240     wanted = lu.cfg.GetInstanceList()
241   return utils.NiceSort(wanted)
242
243
244 def _CheckOutputFields(static, dynamic, selected):
245   """Checks whether all selected fields are valid.
246
247   Args:
248     static: Static fields
249     dynamic: Dynamic fields
250
251   """
252   static_fields = frozenset(static)
253   dynamic_fields = frozenset(dynamic)
254
255   all_fields = static_fields | dynamic_fields
256
257   if not all_fields.issuperset(selected):
258     raise errors.OpPrereqError("Unknown output fields selected: %s"
259                                % ",".join(frozenset(selected).
260                                           difference(all_fields)))
261
262
263 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264                           memory, vcpus, nics):
265   """Builds instance related env variables for hooks from single variables.
266
267   Args:
268     secondary_nodes: List of secondary nodes as strings
269   """
270   env = {
271     "OP_TARGET": name,
272     "INSTANCE_NAME": name,
273     "INSTANCE_PRIMARY": primary_node,
274     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275     "INSTANCE_OS_TYPE": os_type,
276     "INSTANCE_STATUS": status,
277     "INSTANCE_MEMORY": memory,
278     "INSTANCE_VCPUS": vcpus,
279   }
280
281   if nics:
282     nic_count = len(nics)
283     for idx, (ip, bridge, mac) in enumerate(nics):
284       if ip is None:
285         ip = ""
286       env["INSTANCE_NIC%d_IP" % idx] = ip
287       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289   else:
290     nic_count = 0
291
292   env["INSTANCE_NIC_COUNT"] = nic_count
293
294   return env
295
296
297 def _BuildInstanceHookEnvByObject(instance, override=None):
298   """Builds instance related env variables for hooks from an object.
299
300   Args:
301     instance: objects.Instance object of instance
302     override: dict of values to override
303   """
304   args = {
305     'name': instance.name,
306     'primary_node': instance.primary_node,
307     'secondary_nodes': instance.secondary_nodes,
308     'os_type': instance.os,
309     'status': instance.os,
310     'memory': instance.memory,
311     'vcpus': instance.vcpus,
312     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313   }
314   if override:
315     args.update(override)
316   return _BuildInstanceHookEnv(**args)
317
318
319 def _HasValidVG(vglist, vgname):
320   """Checks if the volume group list is valid.
321
322   A non-None return value means there's an error, and the return value
323   is the error message.
324
325   """
326   vgsize = vglist.get(vgname, None)
327   if vgsize is None:
328     return "volume group '%s' missing" % vgname
329   elif vgsize < 20480:
330     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331             (vgname, vgsize))
332   return None
333
334
335 def _InitSSHSetup(node):
336   """Setup the SSH configuration for the cluster.
337
338
339   This generates a dsa keypair for root, adds the pub key to the
340   permitted hosts and adds the hostkey to its own known hosts.
341
342   Args:
343     node: the name of this host as a fqdn
344
345   """
346   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347
348   for name in priv_key, pub_key:
349     if os.path.exists(name):
350       utils.CreateBackup(name)
351     utils.RemoveFile(name)
352
353   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354                          "-f", priv_key,
355                          "-q", "-N", ""])
356   if result.failed:
357     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358                              result.output)
359
360   f = open(pub_key, 'r')
361   try:
362     utils.AddAuthorizedKey(auth_keys, f.read(8192))
363   finally:
364     f.close()
365
366
367 def _InitGanetiServerSetup(ss):
368   """Setup the necessary configuration for the initial node daemon.
369
370   This creates the nodepass file containing the shared password for
371   the cluster and also generates the SSL certificate.
372
373   """
374   # Create pseudo random password
375   randpass = sha.new(os.urandom(64)).hexdigest()
376   # and write it into sstore
377   ss.SetKey(ss.SS_NODED_PASS, randpass)
378
379   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380                          "-days", str(365*5), "-nodes", "-x509",
381                          "-keyout", constants.SSL_CERT_FILE,
382                          "-out", constants.SSL_CERT_FILE, "-batch"])
383   if result.failed:
384     raise errors.OpExecError("could not generate server ssl cert, command"
385                              " %s had exitcode %s and error message %s" %
386                              (result.cmd, result.exit_code, result.output))
387
388   os.chmod(constants.SSL_CERT_FILE, 0400)
389
390   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391
392   if result.failed:
393     raise errors.OpExecError("Could not start the node daemon, command %s"
394                              " had exitcode %s and error %s" %
395                              (result.cmd, result.exit_code, result.output))
396
397
398 def _CheckInstanceBridgesExist(instance):
399   """Check that the brigdes needed by an instance exist.
400
401   """
402   # check bridges existance
403   brlist = [nic.bridge for nic in instance.nics]
404   if not rpc.call_bridges_exist(instance.primary_node, brlist):
405     raise errors.OpPrereqError("one or more target bridges %s does not"
406                                " exist on destination node '%s'" %
407                                (brlist, instance.primary_node))
408
409
410 class LUInitCluster(LogicalUnit):
411   """Initialise the cluster.
412
413   """
414   HPATH = "cluster-init"
415   HTYPE = constants.HTYPE_CLUSTER
416   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
417               "def_bridge", "master_netdev", "file_storage_dir"]
418   REQ_CLUSTER = False
419
420   def BuildHooksEnv(self):
421     """Build hooks env.
422
423     Notes: Since we don't require a cluster, we must manually add
424     ourselves in the post-run node list.
425
426     """
427     env = {"OP_TARGET": self.op.cluster_name}
428     return env, [], [self.hostname.name]
429
430   def CheckPrereq(self):
431     """Verify that the passed name is a valid one.
432
433     """
434     if config.ConfigWriter.IsCluster():
435       raise errors.OpPrereqError("Cluster is already initialised")
436
437     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438       if not os.path.exists(constants.VNC_PASSWORD_FILE):
439         raise errors.OpPrereqError("Please prepare the cluster VNC"
440                                    "password file %s" %
441                                    constants.VNC_PASSWORD_FILE)
442
443     self.hostname = hostname = utils.HostInfo()
444
445     if hostname.ip.startswith("127."):
446       raise errors.OpPrereqError("This host's IP resolves to the private"
447                                  " range (%s). Please fix DNS or %s." %
448                                  (hostname.ip, constants.ETC_HOSTS))
449
450     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451                          source=constants.LOCALHOST_IP_ADDRESS):
452       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453                                  " to %s,\nbut this ip address does not"
454                                  " belong to this host."
455                                  " Aborting." % hostname.ip)
456
457     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458
459     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460                      timeout=5):
461       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462
463     secondary_ip = getattr(self.op, "secondary_ip", None)
464     if secondary_ip and not utils.IsValidIP(secondary_ip):
465       raise errors.OpPrereqError("Invalid secondary ip given")
466     if (secondary_ip and
467         secondary_ip != hostname.ip and
468         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469                            source=constants.LOCALHOST_IP_ADDRESS))):
470       raise errors.OpPrereqError("You gave %s as secondary IP,"
471                                  " but it does not belong to this host." %
472                                  secondary_ip)
473     self.secondary_ip = secondary_ip
474
475     # checks presence of the volume group given
476     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
477
478     if vgstatus:
479       raise errors.OpPrereqError("Error: %s" % vgstatus)
480
481     self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
482
483     if not os.path.isabs(self.op.file_storage_dir):
484       raise errors.OpPrereqError("The file storage directory you have is"
485                                  " not an absolute path.")
486
487     if not os.path.exists(self.op.file_storage_dir):
488       try:
489         os.makedirs(self.op.file_storage_dir, 0750)
490       except OSError, err:
491         raise errors.OpPrereqError("Cannot create file storage directory"
492                                    " '%s': %s" %
493                                    (self.op.file_storage_dir, err))
494
495     if not os.path.isdir(self.op.file_storage_dir):
496       raise errors.OpPrereqError("The file storage directory '%s' is not"
497                                  " a directory." % self.op.file_storage_dir)
498
499     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
500                     self.op.mac_prefix):
501       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
502                                  self.op.mac_prefix)
503
504     if self.op.hypervisor_type not in constants.HYPER_TYPES:
505       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
506                                  self.op.hypervisor_type)
507
508     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
509     if result.failed:
510       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
511                                  (self.op.master_netdev,
512                                   result.output.strip()))
513
514     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
515             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
516       raise errors.OpPrereqError("Init.d script '%s' missing or not"
517                                  " executable." % constants.NODE_INITD_SCRIPT)
518
519   def Exec(self, feedback_fn):
520     """Initialize the cluster.
521
522     """
523     clustername = self.clustername
524     hostname = self.hostname
525
526     # set up the simple store
527     self.sstore = ss = ssconf.SimpleStore()
528     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
529     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
530     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
531     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
532     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
533     ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
534
535     # set up the inter-node password and certificate
536     _InitGanetiServerSetup(ss)
537
538     # start the master ip
539     rpc.call_node_start_master(hostname.name)
540
541     # set up ssh config and /etc/hosts
542     f = open(constants.SSH_HOST_RSA_PUB, 'r')
543     try:
544       sshline = f.read()
545     finally:
546       f.close()
547     sshkey = sshline.split(" ")[1]
548
549     _AddHostToEtcHosts(hostname.name)
550     _InitSSHSetup(hostname.name)
551
552     # init of cluster config file
553     self.cfg = cfgw = config.ConfigWriter()
554     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
555                     sshkey, self.op.mac_prefix,
556                     self.op.vg_name, self.op.def_bridge)
557
558     ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
559
560
561 class LUDestroyCluster(NoHooksLU):
562   """Logical unit for destroying the cluster.
563
564   """
565   _OP_REQP = []
566
567   def CheckPrereq(self):
568     """Check prerequisites.
569
570     This checks whether the cluster is empty.
571
572     Any errors are signalled by raising errors.OpPrereqError.
573
574     """
575     master = self.sstore.GetMasterNode()
576
577     nodelist = self.cfg.GetNodeList()
578     if len(nodelist) != 1 or nodelist[0] != master:
579       raise errors.OpPrereqError("There are still %d node(s) in"
580                                  " this cluster." % (len(nodelist) - 1))
581     instancelist = self.cfg.GetInstanceList()
582     if instancelist:
583       raise errors.OpPrereqError("There are still %d instance(s) in"
584                                  " this cluster." % len(instancelist))
585
586   def Exec(self, feedback_fn):
587     """Destroys the cluster.
588
589     """
590     master = self.sstore.GetMasterNode()
591     if not rpc.call_node_stop_master(master):
592       raise errors.OpExecError("Could not disable the master role")
593     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
594     utils.CreateBackup(priv_key)
595     utils.CreateBackup(pub_key)
596     rpc.call_node_leave_cluster(master)
597
598
599 class LUVerifyCluster(NoHooksLU):
600   """Verifies the cluster status.
601
602   """
603   _OP_REQP = []
604
605   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
606                   remote_version, feedback_fn):
607     """Run multiple tests against a node.
608
609     Test list:
610       - compares ganeti version
611       - checks vg existance and size > 20G
612       - checks config file checksum
613       - checks ssh to other nodes
614
615     Args:
616       node: name of the node to check
617       file_list: required list of files
618       local_cksum: dictionary of local files and their checksums
619
620     """
621     # compares ganeti version
622     local_version = constants.PROTOCOL_VERSION
623     if not remote_version:
624       feedback_fn(" - ERROR: connection to %s failed" % (node))
625       return True
626
627     if local_version != remote_version:
628       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
629                       (local_version, node, remote_version))
630       return True
631
632     # checks vg existance and size > 20G
633
634     bad = False
635     if not vglist:
636       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
637                       (node,))
638       bad = True
639     else:
640       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
641       if vgstatus:
642         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
643         bad = True
644
645     # checks config file checksum
646     # checks ssh to any
647
648     if 'filelist' not in node_result:
649       bad = True
650       feedback_fn("  - ERROR: node hasn't returned file checksum data")
651     else:
652       remote_cksum = node_result['filelist']
653       for file_name in file_list:
654         if file_name not in remote_cksum:
655           bad = True
656           feedback_fn("  - ERROR: file '%s' missing" % file_name)
657         elif remote_cksum[file_name] != local_cksum[file_name]:
658           bad = True
659           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
660
661     if 'nodelist' not in node_result:
662       bad = True
663       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
664     else:
665       if node_result['nodelist']:
666         bad = True
667         for node in node_result['nodelist']:
668           feedback_fn("  - ERROR: communication with node '%s': %s" %
669                           (node, node_result['nodelist'][node]))
670     hyp_result = node_result.get('hypervisor', None)
671     if hyp_result is not None:
672       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
673     return bad
674
675   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
676     """Verify an instance.
677
678     This function checks to see if the required block devices are
679     available on the instance's node.
680
681     """
682     bad = False
683
684     instancelist = self.cfg.GetInstanceList()
685     if not instance in instancelist:
686       feedback_fn("  - ERROR: instance %s not in instance list %s" %
687                       (instance, instancelist))
688       bad = True
689
690     instanceconfig = self.cfg.GetInstanceInfo(instance)
691     node_current = instanceconfig.primary_node
692
693     node_vol_should = {}
694     instanceconfig.MapLVsByNode(node_vol_should)
695
696     for node in node_vol_should:
697       for volume in node_vol_should[node]:
698         if node not in node_vol_is or volume not in node_vol_is[node]:
699           feedback_fn("  - ERROR: volume %s missing on node %s" %
700                           (volume, node))
701           bad = True
702
703     if not instanceconfig.status == 'down':
704       if not instance in node_instance[node_current]:
705         feedback_fn("  - ERROR: instance %s not running on node %s" %
706                         (instance, node_current))
707         bad = True
708
709     for node in node_instance:
710       if (not node == node_current):
711         if instance in node_instance[node]:
712           feedback_fn("  - ERROR: instance %s should not run on node %s" %
713                           (instance, node))
714           bad = True
715
716     return bad
717
718   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
719     """Verify if there are any unknown volumes in the cluster.
720
721     The .os, .swap and backup volumes are ignored. All other volumes are
722     reported as unknown.
723
724     """
725     bad = False
726
727     for node in node_vol_is:
728       for volume in node_vol_is[node]:
729         if node not in node_vol_should or volume not in node_vol_should[node]:
730           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
731                       (volume, node))
732           bad = True
733     return bad
734
735   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
736     """Verify the list of running instances.
737
738     This checks what instances are running but unknown to the cluster.
739
740     """
741     bad = False
742     for node in node_instance:
743       for runninginstance in node_instance[node]:
744         if runninginstance not in instancelist:
745           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
746                           (runninginstance, node))
747           bad = True
748     return bad
749
750   def CheckPrereq(self):
751     """Check prerequisites.
752
753     This has no prerequisites.
754
755     """
756     pass
757
758   def Exec(self, feedback_fn):
759     """Verify integrity of cluster, performing various test on nodes.
760
761     """
762     bad = False
763     feedback_fn("* Verifying global settings")
764     for msg in self.cfg.VerifyConfig():
765       feedback_fn("  - ERROR: %s" % msg)
766
767     vg_name = self.cfg.GetVGName()
768     nodelist = utils.NiceSort(self.cfg.GetNodeList())
769     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
770     node_volume = {}
771     node_instance = {}
772
773     # FIXME: verify OS list
774     # do local checksums
775     file_names = list(self.sstore.GetFileList())
776     file_names.append(constants.SSL_CERT_FILE)
777     file_names.append(constants.CLUSTER_CONF_FILE)
778     local_checksums = utils.FingerprintFiles(file_names)
779
780     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
781     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
782     all_instanceinfo = rpc.call_instance_list(nodelist)
783     all_vglist = rpc.call_vg_list(nodelist)
784     node_verify_param = {
785       'filelist': file_names,
786       'nodelist': nodelist,
787       'hypervisor': None,
788       }
789     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
790     all_rversion = rpc.call_version(nodelist)
791
792     for node in nodelist:
793       feedback_fn("* Verifying node %s" % node)
794       result = self._VerifyNode(node, file_names, local_checksums,
795                                 all_vglist[node], all_nvinfo[node],
796                                 all_rversion[node], feedback_fn)
797       bad = bad or result
798
799       # node_volume
800       volumeinfo = all_volumeinfo[node]
801
802       if isinstance(volumeinfo, basestring):
803         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
804                     (node, volumeinfo[-400:].encode('string_escape')))
805         bad = True
806         node_volume[node] = {}
807       elif not isinstance(volumeinfo, dict):
808         feedback_fn("  - ERROR: connection to %s failed" % (node,))
809         bad = True
810         continue
811       else:
812         node_volume[node] = volumeinfo
813
814       # node_instance
815       nodeinstance = all_instanceinfo[node]
816       if type(nodeinstance) != list:
817         feedback_fn("  - ERROR: connection to %s failed" % (node,))
818         bad = True
819         continue
820
821       node_instance[node] = nodeinstance
822
823     node_vol_should = {}
824
825     for instance in instancelist:
826       feedback_fn("* Verifying instance %s" % instance)
827       result =  self._VerifyInstance(instance, node_volume, node_instance,
828                                      feedback_fn)
829       bad = bad or result
830
831       inst_config = self.cfg.GetInstanceInfo(instance)
832
833       inst_config.MapLVsByNode(node_vol_should)
834
835     feedback_fn("* Verifying orphan volumes")
836     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
837                                        feedback_fn)
838     bad = bad or result
839
840     feedback_fn("* Verifying remaining instances")
841     result = self._VerifyOrphanInstances(instancelist, node_instance,
842                                          feedback_fn)
843     bad = bad or result
844
845     return int(bad)
846
847
848 class LUVerifyDisks(NoHooksLU):
849   """Verifies the cluster disks status.
850
851   """
852   _OP_REQP = []
853
854   def CheckPrereq(self):
855     """Check prerequisites.
856
857     This has no prerequisites.
858
859     """
860     pass
861
862   def Exec(self, feedback_fn):
863     """Verify integrity of cluster disks.
864
865     """
866     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
867
868     vg_name = self.cfg.GetVGName()
869     nodes = utils.NiceSort(self.cfg.GetNodeList())
870     instances = [self.cfg.GetInstanceInfo(name)
871                  for name in self.cfg.GetInstanceList()]
872
873     nv_dict = {}
874     for inst in instances:
875       inst_lvs = {}
876       if (inst.status != "up" or
877           inst.disk_template not in constants.DTS_NET_MIRROR):
878         continue
879       inst.MapLVsByNode(inst_lvs)
880       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
881       for node, vol_list in inst_lvs.iteritems():
882         for vol in vol_list:
883           nv_dict[(node, vol)] = inst
884
885     if not nv_dict:
886       return result
887
888     node_lvs = rpc.call_volume_list(nodes, vg_name)
889
890     to_act = set()
891     for node in nodes:
892       # node_volume
893       lvs = node_lvs[node]
894
895       if isinstance(lvs, basestring):
896         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
897         res_nlvm[node] = lvs
898       elif not isinstance(lvs, dict):
899         logger.Info("connection to node %s failed or invalid data returned" %
900                     (node,))
901         res_nodes.append(node)
902         continue
903
904       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
905         inst = nv_dict.pop((node, lv_name), None)
906         if (not lv_online and inst is not None
907             and inst.name not in res_instances):
908           res_instances.append(inst.name)
909
910     # any leftover items in nv_dict are missing LVs, let's arrange the
911     # data better
912     for key, inst in nv_dict.iteritems():
913       if inst.name not in res_missing:
914         res_missing[inst.name] = []
915       res_missing[inst.name].append(key)
916
917     return result
918
919
920 class LURenameCluster(LogicalUnit):
921   """Rename the cluster.
922
923   """
924   HPATH = "cluster-rename"
925   HTYPE = constants.HTYPE_CLUSTER
926   _OP_REQP = ["name"]
927
928   def BuildHooksEnv(self):
929     """Build hooks env.
930
931     """
932     env = {
933       "OP_TARGET": self.sstore.GetClusterName(),
934       "NEW_NAME": self.op.name,
935       }
936     mn = self.sstore.GetMasterNode()
937     return env, [mn], [mn]
938
939   def CheckPrereq(self):
940     """Verify that the passed name is a valid one.
941
942     """
943     hostname = utils.HostInfo(self.op.name)
944
945     new_name = hostname.name
946     self.ip = new_ip = hostname.ip
947     old_name = self.sstore.GetClusterName()
948     old_ip = self.sstore.GetMasterIP()
949     if new_name == old_name and new_ip == old_ip:
950       raise errors.OpPrereqError("Neither the name nor the IP address of the"
951                                  " cluster has changed")
952     if new_ip != old_ip:
953       result = utils.RunCmd(["fping", "-q", new_ip])
954       if not result.failed:
955         raise errors.OpPrereqError("The given cluster IP address (%s) is"
956                                    " reachable on the network. Aborting." %
957                                    new_ip)
958
959     self.op.name = new_name
960
961   def Exec(self, feedback_fn):
962     """Rename the cluster.
963
964     """
965     clustername = self.op.name
966     ip = self.ip
967     ss = self.sstore
968
969     # shutdown the master IP
970     master = ss.GetMasterNode()
971     if not rpc.call_node_stop_master(master):
972       raise errors.OpExecError("Could not disable the master role")
973
974     try:
975       # modify the sstore
976       ss.SetKey(ss.SS_MASTER_IP, ip)
977       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
978
979       # Distribute updated ss config to all nodes
980       myself = self.cfg.GetNodeInfo(master)
981       dist_nodes = self.cfg.GetNodeList()
982       if myself.name in dist_nodes:
983         dist_nodes.remove(myself.name)
984
985       logger.Debug("Copying updated ssconf data to all nodes")
986       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
987         fname = ss.KeyToFilename(keyname)
988         result = rpc.call_upload_file(dist_nodes, fname)
989         for to_node in dist_nodes:
990           if not result[to_node]:
991             logger.Error("copy of file %s to node %s failed" %
992                          (fname, to_node))
993     finally:
994       if not rpc.call_node_start_master(master):
995         logger.Error("Could not re-enable the master role on the master,"
996                      " please restart manually.")
997
998
999 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1000   """Sleep and poll for an instance's disk to sync.
1001
1002   """
1003   if not instance.disks:
1004     return True
1005
1006   if not oneshot:
1007     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1008
1009   node = instance.primary_node
1010
1011   for dev in instance.disks:
1012     cfgw.SetDiskID(dev, node)
1013
1014   retries = 0
1015   while True:
1016     max_time = 0
1017     done = True
1018     cumul_degraded = False
1019     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1020     if not rstats:
1021       proc.LogWarning("Can't get any data from node %s" % node)
1022       retries += 1
1023       if retries >= 10:
1024         raise errors.RemoteError("Can't contact node %s for mirror data,"
1025                                  " aborting." % node)
1026       time.sleep(6)
1027       continue
1028     retries = 0
1029     for i in range(len(rstats)):
1030       mstat = rstats[i]
1031       if mstat is None:
1032         proc.LogWarning("Can't compute data for node %s/%s" %
1033                         (node, instance.disks[i].iv_name))
1034         continue
1035       # we ignore the ldisk parameter
1036       perc_done, est_time, is_degraded, _ = mstat
1037       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1038       if perc_done is not None:
1039         done = False
1040         if est_time is not None:
1041           rem_time = "%d estimated seconds remaining" % est_time
1042           max_time = est_time
1043         else:
1044           rem_time = "no time estimate"
1045         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1046                      (instance.disks[i].iv_name, perc_done, rem_time))
1047     if done or oneshot:
1048       break
1049
1050     if unlock:
1051       utils.Unlock('cmd')
1052     try:
1053       time.sleep(min(60, max_time))
1054     finally:
1055       if unlock:
1056         utils.Lock('cmd')
1057
1058   if done:
1059     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1060   return not cumul_degraded
1061
1062
1063 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1064   """Check that mirrors are not degraded.
1065
1066   The ldisk parameter, if True, will change the test from the
1067   is_degraded attribute (which represents overall non-ok status for
1068   the device(s)) to the ldisk (representing the local storage status).
1069
1070   """
1071   cfgw.SetDiskID(dev, node)
1072   if ldisk:
1073     idx = 6
1074   else:
1075     idx = 5
1076
1077   result = True
1078   if on_primary or dev.AssembleOnSecondary():
1079     rstats = rpc.call_blockdev_find(node, dev)
1080     if not rstats:
1081       logger.ToStderr("Can't get any data from node %s" % node)
1082       result = False
1083     else:
1084       result = result and (not rstats[idx])
1085   if dev.children:
1086     for child in dev.children:
1087       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1088
1089   return result
1090
1091
1092 class LUDiagnoseOS(NoHooksLU):
1093   """Logical unit for OS diagnose/query.
1094
1095   """
1096   _OP_REQP = []
1097
1098   def CheckPrereq(self):
1099     """Check prerequisites.
1100
1101     This always succeeds, since this is a pure query LU.
1102
1103     """
1104     return
1105
1106   def Exec(self, feedback_fn):
1107     """Compute the list of OSes.
1108
1109     """
1110     node_list = self.cfg.GetNodeList()
1111     node_data = rpc.call_os_diagnose(node_list)
1112     if node_data == False:
1113       raise errors.OpExecError("Can't gather the list of OSes")
1114     return node_data
1115
1116
1117 class LURemoveNode(LogicalUnit):
1118   """Logical unit for removing a node.
1119
1120   """
1121   HPATH = "node-remove"
1122   HTYPE = constants.HTYPE_NODE
1123   _OP_REQP = ["node_name"]
1124
1125   def BuildHooksEnv(self):
1126     """Build hooks env.
1127
1128     This doesn't run on the target node in the pre phase as a failed
1129     node would not allows itself to run.
1130
1131     """
1132     env = {
1133       "OP_TARGET": self.op.node_name,
1134       "NODE_NAME": self.op.node_name,
1135       }
1136     all_nodes = self.cfg.GetNodeList()
1137     all_nodes.remove(self.op.node_name)
1138     return env, all_nodes, all_nodes
1139
1140   def CheckPrereq(self):
1141     """Check prerequisites.
1142
1143     This checks:
1144      - the node exists in the configuration
1145      - it does not have primary or secondary instances
1146      - it's not the master
1147
1148     Any errors are signalled by raising errors.OpPrereqError.
1149
1150     """
1151     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1152     if node is None:
1153       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1154
1155     instance_list = self.cfg.GetInstanceList()
1156
1157     masternode = self.sstore.GetMasterNode()
1158     if node.name == masternode:
1159       raise errors.OpPrereqError("Node is the master node,"
1160                                  " you need to failover first.")
1161
1162     for instance_name in instance_list:
1163       instance = self.cfg.GetInstanceInfo(instance_name)
1164       if node.name == instance.primary_node:
1165         raise errors.OpPrereqError("Instance %s still running on the node,"
1166                                    " please remove first." % instance_name)
1167       if node.name in instance.secondary_nodes:
1168         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1169                                    " please remove first." % instance_name)
1170     self.op.node_name = node.name
1171     self.node = node
1172
1173   def Exec(self, feedback_fn):
1174     """Removes the node from the cluster.
1175
1176     """
1177     node = self.node
1178     logger.Info("stopping the node daemon and removing configs from node %s" %
1179                 node.name)
1180
1181     rpc.call_node_leave_cluster(node.name)
1182
1183     self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1184
1185     logger.Info("Removing node %s from config" % node.name)
1186
1187     self.cfg.RemoveNode(node.name)
1188
1189     _RemoveHostFromEtcHosts(node.name)
1190
1191
1192 class LUQueryNodes(NoHooksLU):
1193   """Logical unit for querying nodes.
1194
1195   """
1196   _OP_REQP = ["output_fields", "names"]
1197
1198   def CheckPrereq(self):
1199     """Check prerequisites.
1200
1201     This checks that the fields required are valid output fields.
1202
1203     """
1204     self.dynamic_fields = frozenset(["dtotal", "dfree",
1205                                      "mtotal", "mnode", "mfree",
1206                                      "bootid"])
1207
1208     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1209                                "pinst_list", "sinst_list",
1210                                "pip", "sip"],
1211                        dynamic=self.dynamic_fields,
1212                        selected=self.op.output_fields)
1213
1214     self.wanted = _GetWantedNodes(self, self.op.names)
1215
1216   def Exec(self, feedback_fn):
1217     """Computes the list of nodes and their attributes.
1218
1219     """
1220     nodenames = self.wanted
1221     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1222
1223     # begin data gathering
1224
1225     if self.dynamic_fields.intersection(self.op.output_fields):
1226       live_data = {}
1227       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1228       for name in nodenames:
1229         nodeinfo = node_data.get(name, None)
1230         if nodeinfo:
1231           live_data[name] = {
1232             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1233             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1234             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1235             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1236             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1237             "bootid": nodeinfo['bootid'],
1238             }
1239         else:
1240           live_data[name] = {}
1241     else:
1242       live_data = dict.fromkeys(nodenames, {})
1243
1244     node_to_primary = dict([(name, set()) for name in nodenames])
1245     node_to_secondary = dict([(name, set()) for name in nodenames])
1246
1247     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1248                              "sinst_cnt", "sinst_list"))
1249     if inst_fields & frozenset(self.op.output_fields):
1250       instancelist = self.cfg.GetInstanceList()
1251
1252       for instance_name in instancelist:
1253         inst = self.cfg.GetInstanceInfo(instance_name)
1254         if inst.primary_node in node_to_primary:
1255           node_to_primary[inst.primary_node].add(inst.name)
1256         for secnode in inst.secondary_nodes:
1257           if secnode in node_to_secondary:
1258             node_to_secondary[secnode].add(inst.name)
1259
1260     # end data gathering
1261
1262     output = []
1263     for node in nodelist:
1264       node_output = []
1265       for field in self.op.output_fields:
1266         if field == "name":
1267           val = node.name
1268         elif field == "pinst_list":
1269           val = list(node_to_primary[node.name])
1270         elif field == "sinst_list":
1271           val = list(node_to_secondary[node.name])
1272         elif field == "pinst_cnt":
1273           val = len(node_to_primary[node.name])
1274         elif field == "sinst_cnt":
1275           val = len(node_to_secondary[node.name])
1276         elif field == "pip":
1277           val = node.primary_ip
1278         elif field == "sip":
1279           val = node.secondary_ip
1280         elif field in self.dynamic_fields:
1281           val = live_data[node.name].get(field, None)
1282         else:
1283           raise errors.ParameterError(field)
1284         node_output.append(val)
1285       output.append(node_output)
1286
1287     return output
1288
1289
1290 class LUQueryNodeVolumes(NoHooksLU):
1291   """Logical unit for getting volumes on node(s).
1292
1293   """
1294   _OP_REQP = ["nodes", "output_fields"]
1295
1296   def CheckPrereq(self):
1297     """Check prerequisites.
1298
1299     This checks that the fields required are valid output fields.
1300
1301     """
1302     self.nodes = _GetWantedNodes(self, self.op.nodes)
1303
1304     _CheckOutputFields(static=["node"],
1305                        dynamic=["phys", "vg", "name", "size", "instance"],
1306                        selected=self.op.output_fields)
1307
1308
1309   def Exec(self, feedback_fn):
1310     """Computes the list of nodes and their attributes.
1311
1312     """
1313     nodenames = self.nodes
1314     volumes = rpc.call_node_volumes(nodenames)
1315
1316     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1317              in self.cfg.GetInstanceList()]
1318
1319     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1320
1321     output = []
1322     for node in nodenames:
1323       if node not in volumes or not volumes[node]:
1324         continue
1325
1326       node_vols = volumes[node][:]
1327       node_vols.sort(key=lambda vol: vol['dev'])
1328
1329       for vol in node_vols:
1330         node_output = []
1331         for field in self.op.output_fields:
1332           if field == "node":
1333             val = node
1334           elif field == "phys":
1335             val = vol['dev']
1336           elif field == "vg":
1337             val = vol['vg']
1338           elif field == "name":
1339             val = vol['name']
1340           elif field == "size":
1341             val = int(float(vol['size']))
1342           elif field == "instance":
1343             for inst in ilist:
1344               if node not in lv_by_node[inst]:
1345                 continue
1346               if vol['name'] in lv_by_node[inst][node]:
1347                 val = inst.name
1348                 break
1349             else:
1350               val = '-'
1351           else:
1352             raise errors.ParameterError(field)
1353           node_output.append(str(val))
1354
1355         output.append(node_output)
1356
1357     return output
1358
1359
1360 class LUAddNode(LogicalUnit):
1361   """Logical unit for adding node to the cluster.
1362
1363   """
1364   HPATH = "node-add"
1365   HTYPE = constants.HTYPE_NODE
1366   _OP_REQP = ["node_name"]
1367
1368   def BuildHooksEnv(self):
1369     """Build hooks env.
1370
1371     This will run on all nodes before, and on all nodes + the new node after.
1372
1373     """
1374     env = {
1375       "OP_TARGET": self.op.node_name,
1376       "NODE_NAME": self.op.node_name,
1377       "NODE_PIP": self.op.primary_ip,
1378       "NODE_SIP": self.op.secondary_ip,
1379       }
1380     nodes_0 = self.cfg.GetNodeList()
1381     nodes_1 = nodes_0 + [self.op.node_name, ]
1382     return env, nodes_0, nodes_1
1383
1384   def CheckPrereq(self):
1385     """Check prerequisites.
1386
1387     This checks:
1388      - the new node is not already in the config
1389      - it is resolvable
1390      - its parameters (single/dual homed) matches the cluster
1391
1392     Any errors are signalled by raising errors.OpPrereqError.
1393
1394     """
1395     node_name = self.op.node_name
1396     cfg = self.cfg
1397
1398     dns_data = utils.HostInfo(node_name)
1399
1400     node = dns_data.name
1401     primary_ip = self.op.primary_ip = dns_data.ip
1402     secondary_ip = getattr(self.op, "secondary_ip", None)
1403     if secondary_ip is None:
1404       secondary_ip = primary_ip
1405     if not utils.IsValidIP(secondary_ip):
1406       raise errors.OpPrereqError("Invalid secondary IP given")
1407     self.op.secondary_ip = secondary_ip
1408     node_list = cfg.GetNodeList()
1409     if node in node_list:
1410       raise errors.OpPrereqError("Node %s is already in the configuration"
1411                                  % node)
1412
1413     for existing_node_name in node_list:
1414       existing_node = cfg.GetNodeInfo(existing_node_name)
1415       if (existing_node.primary_ip == primary_ip or
1416           existing_node.secondary_ip == primary_ip or
1417           existing_node.primary_ip == secondary_ip or
1418           existing_node.secondary_ip == secondary_ip):
1419         raise errors.OpPrereqError("New node ip address(es) conflict with"
1420                                    " existing node %s" % existing_node.name)
1421
1422     # check that the type of the node (single versus dual homed) is the
1423     # same as for the master
1424     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1425     master_singlehomed = myself.secondary_ip == myself.primary_ip
1426     newbie_singlehomed = secondary_ip == primary_ip
1427     if master_singlehomed != newbie_singlehomed:
1428       if master_singlehomed:
1429         raise errors.OpPrereqError("The master has no private ip but the"
1430                                    " new node has one")
1431       else:
1432         raise errors.OpPrereqError("The master has a private ip but the"
1433                                    " new node doesn't have one")
1434
1435     # checks reachablity
1436     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1437       raise errors.OpPrereqError("Node not reachable by ping")
1438
1439     if not newbie_singlehomed:
1440       # check reachability from my secondary ip to newbie's secondary ip
1441       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1442                            source=myself.secondary_ip):
1443         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1444                                    " based ping to noded port")
1445
1446     self.new_node = objects.Node(name=node,
1447                                  primary_ip=primary_ip,
1448                                  secondary_ip=secondary_ip)
1449
1450     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1451       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1452         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1453                                    constants.VNC_PASSWORD_FILE)
1454
1455   def Exec(self, feedback_fn):
1456     """Adds the new node to the cluster.
1457
1458     """
1459     new_node = self.new_node
1460     node = new_node.name
1461
1462     # set up inter-node password and certificate and restarts the node daemon
1463     gntpass = self.sstore.GetNodeDaemonPassword()
1464     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1465       raise errors.OpExecError("ganeti password corruption detected")
1466     f = open(constants.SSL_CERT_FILE)
1467     try:
1468       gntpem = f.read(8192)
1469     finally:
1470       f.close()
1471     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1472     # so we use this to detect an invalid certificate; as long as the
1473     # cert doesn't contain this, the here-document will be correctly
1474     # parsed by the shell sequence below
1475     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1476       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1477     if not gntpem.endswith("\n"):
1478       raise errors.OpExecError("PEM must end with newline")
1479     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1480
1481     # and then connect with ssh to set password and start ganeti-noded
1482     # note that all the below variables are sanitized at this point,
1483     # either by being constants or by the checks above
1484     ss = self.sstore
1485     mycommand = ("umask 077 && "
1486                  "echo '%s' > '%s' && "
1487                  "cat > '%s' << '!EOF.' && \n"
1488                  "%s!EOF.\n%s restart" %
1489                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1490                   constants.SSL_CERT_FILE, gntpem,
1491                   constants.NODE_INITD_SCRIPT))
1492
1493     result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1494     if result.failed:
1495       raise errors.OpExecError("Remote command on node %s, error: %s,"
1496                                " output: %s" %
1497                                (node, result.fail_reason, result.output))
1498
1499     # check connectivity
1500     time.sleep(4)
1501
1502     result = rpc.call_version([node])[node]
1503     if result:
1504       if constants.PROTOCOL_VERSION == result:
1505         logger.Info("communication to node %s fine, sw version %s match" %
1506                     (node, result))
1507       else:
1508         raise errors.OpExecError("Version mismatch master version %s,"
1509                                  " node version %s" %
1510                                  (constants.PROTOCOL_VERSION, result))
1511     else:
1512       raise errors.OpExecError("Cannot get version from the new node")
1513
1514     # setup ssh on node
1515     logger.Info("copy ssh key to node %s" % node)
1516     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1517     keyarray = []
1518     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1519                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1520                 priv_key, pub_key]
1521
1522     for i in keyfiles:
1523       f = open(i, 'r')
1524       try:
1525         keyarray.append(f.read())
1526       finally:
1527         f.close()
1528
1529     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1530                                keyarray[3], keyarray[4], keyarray[5])
1531
1532     if not result:
1533       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1534
1535     # Add node to our /etc/hosts, and add key to known_hosts
1536     _AddHostToEtcHosts(new_node.name)
1537
1538     if new_node.secondary_ip != new_node.primary_ip:
1539       if not rpc.call_node_tcp_ping(new_node.name,
1540                                     constants.LOCALHOST_IP_ADDRESS,
1541                                     new_node.secondary_ip,
1542                                     constants.DEFAULT_NODED_PORT,
1543                                     10, False):
1544         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1545                                  " you gave (%s). Please fix and re-run this"
1546                                  " command." % new_node.secondary_ip)
1547
1548     success, msg = self.ssh.VerifyNodeHostname(node)
1549     if not success:
1550       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1551                                " than the one the resolver gives: %s."
1552                                " Please fix and re-run this command." %
1553                                (node, msg))
1554
1555     # Distribute updated /etc/hosts and known_hosts to all nodes,
1556     # including the node just added
1557     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1558     dist_nodes = self.cfg.GetNodeList() + [node]
1559     if myself.name in dist_nodes:
1560       dist_nodes.remove(myself.name)
1561
1562     logger.Debug("Copying hosts and known_hosts to all nodes")
1563     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1564       result = rpc.call_upload_file(dist_nodes, fname)
1565       for to_node in dist_nodes:
1566         if not result[to_node]:
1567           logger.Error("copy of file %s to node %s failed" %
1568                        (fname, to_node))
1569
1570     to_copy = ss.GetFileList()
1571     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1572       to_copy.append(constants.VNC_PASSWORD_FILE)
1573     for fname in to_copy:
1574       if not self.ssh.CopyFileToNode(node, fname):
1575         logger.Error("could not copy file %s to node %s" % (fname, node))
1576
1577     logger.Info("adding node %s to cluster.conf" % node)
1578     self.cfg.AddNode(new_node)
1579
1580
1581 class LUMasterFailover(LogicalUnit):
1582   """Failover the master node to the current node.
1583
1584   This is a special LU in that it must run on a non-master node.
1585
1586   """
1587   HPATH = "master-failover"
1588   HTYPE = constants.HTYPE_CLUSTER
1589   REQ_MASTER = False
1590   _OP_REQP = []
1591
1592   def BuildHooksEnv(self):
1593     """Build hooks env.
1594
1595     This will run on the new master only in the pre phase, and on all
1596     the nodes in the post phase.
1597
1598     """
1599     env = {
1600       "OP_TARGET": self.new_master,
1601       "NEW_MASTER": self.new_master,
1602       "OLD_MASTER": self.old_master,
1603       }
1604     return env, [self.new_master], self.cfg.GetNodeList()
1605
1606   def CheckPrereq(self):
1607     """Check prerequisites.
1608
1609     This checks that we are not already the master.
1610
1611     """
1612     self.new_master = utils.HostInfo().name
1613     self.old_master = self.sstore.GetMasterNode()
1614
1615     if self.old_master == self.new_master:
1616       raise errors.OpPrereqError("This commands must be run on the node"
1617                                  " where you want the new master to be."
1618                                  " %s is already the master" %
1619                                  self.old_master)
1620
1621   def Exec(self, feedback_fn):
1622     """Failover the master node.
1623
1624     This command, when run on a non-master node, will cause the current
1625     master to cease being master, and the non-master to become new
1626     master.
1627
1628     """
1629     #TODO: do not rely on gethostname returning the FQDN
1630     logger.Info("setting master to %s, old master: %s" %
1631                 (self.new_master, self.old_master))
1632
1633     if not rpc.call_node_stop_master(self.old_master):
1634       logger.Error("could disable the master role on the old master"
1635                    " %s, please disable manually" % self.old_master)
1636
1637     ss = self.sstore
1638     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1639     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1640                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1641       logger.Error("could not distribute the new simple store master file"
1642                    " to the other nodes, please check.")
1643
1644     if not rpc.call_node_start_master(self.new_master):
1645       logger.Error("could not start the master role on the new master"
1646                    " %s, please check" % self.new_master)
1647       feedback_fn("Error in activating the master IP on the new master,"
1648                   " please fix manually.")
1649
1650
1651
1652 class LUQueryClusterInfo(NoHooksLU):
1653   """Query cluster configuration.
1654
1655   """
1656   _OP_REQP = []
1657   REQ_MASTER = False
1658
1659   def CheckPrereq(self):
1660     """No prerequsites needed for this LU.
1661
1662     """
1663     pass
1664
1665   def Exec(self, feedback_fn):
1666     """Return cluster config.
1667
1668     """
1669     result = {
1670       "name": self.sstore.GetClusterName(),
1671       "software_version": constants.RELEASE_VERSION,
1672       "protocol_version": constants.PROTOCOL_VERSION,
1673       "config_version": constants.CONFIG_VERSION,
1674       "os_api_version": constants.OS_API_VERSION,
1675       "export_version": constants.EXPORT_VERSION,
1676       "master": self.sstore.GetMasterNode(),
1677       "architecture": (platform.architecture()[0], platform.machine()),
1678       }
1679
1680     return result
1681
1682
1683 class LUClusterCopyFile(NoHooksLU):
1684   """Copy file to cluster.
1685
1686   """
1687   _OP_REQP = ["nodes", "filename"]
1688
1689   def CheckPrereq(self):
1690     """Check prerequisites.
1691
1692     It should check that the named file exists and that the given list
1693     of nodes is valid.
1694
1695     """
1696     if not os.path.exists(self.op.filename):
1697       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1698
1699     self.nodes = _GetWantedNodes(self, self.op.nodes)
1700
1701   def Exec(self, feedback_fn):
1702     """Copy a file from master to some nodes.
1703
1704     Args:
1705       opts - class with options as members
1706       args - list containing a single element, the file name
1707     Opts used:
1708       nodes - list containing the name of target nodes; if empty, all nodes
1709
1710     """
1711     filename = self.op.filename
1712
1713     myname = utils.HostInfo().name
1714
1715     for node in self.nodes:
1716       if node == myname:
1717         continue
1718       if not self.ssh.CopyFileToNode(node, filename):
1719         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1720
1721
1722 class LUDumpClusterConfig(NoHooksLU):
1723   """Return a text-representation of the cluster-config.
1724
1725   """
1726   _OP_REQP = []
1727
1728   def CheckPrereq(self):
1729     """No prerequisites.
1730
1731     """
1732     pass
1733
1734   def Exec(self, feedback_fn):
1735     """Dump a representation of the cluster config to the standard output.
1736
1737     """
1738     return self.cfg.DumpConfig()
1739
1740
1741 class LURunClusterCommand(NoHooksLU):
1742   """Run a command on some nodes.
1743
1744   """
1745   _OP_REQP = ["command", "nodes"]
1746
1747   def CheckPrereq(self):
1748     """Check prerequisites.
1749
1750     It checks that the given list of nodes is valid.
1751
1752     """
1753     self.nodes = _GetWantedNodes(self, self.op.nodes)
1754
1755   def Exec(self, feedback_fn):
1756     """Run a command on some nodes.
1757
1758     """
1759     data = []
1760     for node in self.nodes:
1761       result = self.ssh.Run(node, "root", self.op.command)
1762       data.append((node, result.output, result.exit_code))
1763
1764     return data
1765
1766
1767 class LUActivateInstanceDisks(NoHooksLU):
1768   """Bring up an instance's disks.
1769
1770   """
1771   _OP_REQP = ["instance_name"]
1772
1773   def CheckPrereq(self):
1774     """Check prerequisites.
1775
1776     This checks that the instance is in the cluster.
1777
1778     """
1779     instance = self.cfg.GetInstanceInfo(
1780       self.cfg.ExpandInstanceName(self.op.instance_name))
1781     if instance is None:
1782       raise errors.OpPrereqError("Instance '%s' not known" %
1783                                  self.op.instance_name)
1784     self.instance = instance
1785
1786
1787   def Exec(self, feedback_fn):
1788     """Activate the disks.
1789
1790     """
1791     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1792     if not disks_ok:
1793       raise errors.OpExecError("Cannot activate block devices")
1794
1795     return disks_info
1796
1797
1798 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1799   """Prepare the block devices for an instance.
1800
1801   This sets up the block devices on all nodes.
1802
1803   Args:
1804     instance: a ganeti.objects.Instance object
1805     ignore_secondaries: if true, errors on secondary nodes won't result
1806                         in an error return from the function
1807
1808   Returns:
1809     false if the operation failed
1810     list of (host, instance_visible_name, node_visible_name) if the operation
1811          suceeded with the mapping from node devices to instance devices
1812   """
1813   device_info = []
1814   disks_ok = True
1815   iname = instance.name
1816   # With the two passes mechanism we try to reduce the window of
1817   # opportunity for the race condition of switching DRBD to primary
1818   # before handshaking occured, but we do not eliminate it
1819
1820   # The proper fix would be to wait (with some limits) until the
1821   # connection has been made and drbd transitions from WFConnection
1822   # into any other network-connected state (Connected, SyncTarget,
1823   # SyncSource, etc.)
1824
1825   # 1st pass, assemble on all nodes in secondary mode
1826   for inst_disk in instance.disks:
1827     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1828       cfg.SetDiskID(node_disk, node)
1829       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1830       if not result:
1831         logger.Error("could not prepare block device %s on node %s"
1832                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1833         if not ignore_secondaries:
1834           disks_ok = False
1835
1836   # FIXME: race condition on drbd migration to primary
1837
1838   # 2nd pass, do only the primary node
1839   for inst_disk in instance.disks:
1840     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1841       if node != instance.primary_node:
1842         continue
1843       cfg.SetDiskID(node_disk, node)
1844       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1845       if not result:
1846         logger.Error("could not prepare block device %s on node %s"
1847                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1848         disks_ok = False
1849     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1850
1851   # leave the disks configured for the primary node
1852   # this is a workaround that would be fixed better by
1853   # improving the logical/physical id handling
1854   for disk in instance.disks:
1855     cfg.SetDiskID(disk, instance.primary_node)
1856
1857   return disks_ok, device_info
1858
1859
1860 def _StartInstanceDisks(cfg, instance, force):
1861   """Start the disks of an instance.
1862
1863   """
1864   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1865                                            ignore_secondaries=force)
1866   if not disks_ok:
1867     _ShutdownInstanceDisks(instance, cfg)
1868     if force is not None and not force:
1869       logger.Error("If the message above refers to a secondary node,"
1870                    " you can retry the operation using '--force'.")
1871     raise errors.OpExecError("Disk consistency error")
1872
1873
1874 class LUDeactivateInstanceDisks(NoHooksLU):
1875   """Shutdown an instance's disks.
1876
1877   """
1878   _OP_REQP = ["instance_name"]
1879
1880   def CheckPrereq(self):
1881     """Check prerequisites.
1882
1883     This checks that the instance is in the cluster.
1884
1885     """
1886     instance = self.cfg.GetInstanceInfo(
1887       self.cfg.ExpandInstanceName(self.op.instance_name))
1888     if instance is None:
1889       raise errors.OpPrereqError("Instance '%s' not known" %
1890                                  self.op.instance_name)
1891     self.instance = instance
1892
1893   def Exec(self, feedback_fn):
1894     """Deactivate the disks
1895
1896     """
1897     instance = self.instance
1898     ins_l = rpc.call_instance_list([instance.primary_node])
1899     ins_l = ins_l[instance.primary_node]
1900     if not type(ins_l) is list:
1901       raise errors.OpExecError("Can't contact node '%s'" %
1902                                instance.primary_node)
1903
1904     if self.instance.name in ins_l:
1905       raise errors.OpExecError("Instance is running, can't shutdown"
1906                                " block devices.")
1907
1908     _ShutdownInstanceDisks(instance, self.cfg)
1909
1910
1911 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1912   """Shutdown block devices of an instance.
1913
1914   This does the shutdown on all nodes of the instance.
1915
1916   If the ignore_primary is false, errors on the primary node are
1917   ignored.
1918
1919   """
1920   result = True
1921   for disk in instance.disks:
1922     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1923       cfg.SetDiskID(top_disk, node)
1924       if not rpc.call_blockdev_shutdown(node, top_disk):
1925         logger.Error("could not shutdown block device %s on node %s" %
1926                      (disk.iv_name, node))
1927         if not ignore_primary or node != instance.primary_node:
1928           result = False
1929   return result
1930
1931
1932 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1933   """Checks if a node has enough free memory.
1934
1935   This function check if a given node has the needed amount of free
1936   memory. In case the node has less memory or we cannot get the
1937   information from the node, this function raise an OpPrereqError
1938   exception.
1939
1940   Args:
1941     - cfg: a ConfigWriter instance
1942     - node: the node name
1943     - reason: string to use in the error message
1944     - requested: the amount of memory in MiB
1945
1946   """
1947   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1948   if not nodeinfo or not isinstance(nodeinfo, dict):
1949     raise errors.OpPrereqError("Could not contact node %s for resource"
1950                              " information" % (node,))
1951
1952   free_mem = nodeinfo[node].get('memory_free')
1953   if not isinstance(free_mem, int):
1954     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1955                              " was '%s'" % (node, free_mem))
1956   if requested > free_mem:
1957     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1958                              " needed %s MiB, available %s MiB" %
1959                              (node, reason, requested, free_mem))
1960
1961
1962 class LUStartupInstance(LogicalUnit):
1963   """Starts an instance.
1964
1965   """
1966   HPATH = "instance-start"
1967   HTYPE = constants.HTYPE_INSTANCE
1968   _OP_REQP = ["instance_name", "force"]
1969
1970   def BuildHooksEnv(self):
1971     """Build hooks env.
1972
1973     This runs on master, primary and secondary nodes of the instance.
1974
1975     """
1976     env = {
1977       "FORCE": self.op.force,
1978       }
1979     env.update(_BuildInstanceHookEnvByObject(self.instance))
1980     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1981           list(self.instance.secondary_nodes))
1982     return env, nl, nl
1983
1984   def CheckPrereq(self):
1985     """Check prerequisites.
1986
1987     This checks that the instance is in the cluster.
1988
1989     """
1990     instance = self.cfg.GetInstanceInfo(
1991       self.cfg.ExpandInstanceName(self.op.instance_name))
1992     if instance is None:
1993       raise errors.OpPrereqError("Instance '%s' not known" %
1994                                  self.op.instance_name)
1995
1996     # check bridges existance
1997     _CheckInstanceBridgesExist(instance)
1998
1999     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2000                          "starting instance %s" % instance.name,
2001                          instance.memory)
2002
2003     self.instance = instance
2004     self.op.instance_name = instance.name
2005
2006   def Exec(self, feedback_fn):
2007     """Start the instance.
2008
2009     """
2010     instance = self.instance
2011     force = self.op.force
2012     extra_args = getattr(self.op, "extra_args", "")
2013
2014     self.cfg.MarkInstanceUp(instance.name)
2015
2016     node_current = instance.primary_node
2017
2018     _StartInstanceDisks(self.cfg, instance, force)
2019
2020     if not rpc.call_instance_start(node_current, instance, extra_args):
2021       _ShutdownInstanceDisks(instance, self.cfg)
2022       raise errors.OpExecError("Could not start instance")
2023
2024
2025 class LURebootInstance(LogicalUnit):
2026   """Reboot an instance.
2027
2028   """
2029   HPATH = "instance-reboot"
2030   HTYPE = constants.HTYPE_INSTANCE
2031   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2032
2033   def BuildHooksEnv(self):
2034     """Build hooks env.
2035
2036     This runs on master, primary and secondary nodes of the instance.
2037
2038     """
2039     env = {
2040       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2041       }
2042     env.update(_BuildInstanceHookEnvByObject(self.instance))
2043     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2044           list(self.instance.secondary_nodes))
2045     return env, nl, nl
2046
2047   def CheckPrereq(self):
2048     """Check prerequisites.
2049
2050     This checks that the instance is in the cluster.
2051
2052     """
2053     instance = self.cfg.GetInstanceInfo(
2054       self.cfg.ExpandInstanceName(self.op.instance_name))
2055     if instance is None:
2056       raise errors.OpPrereqError("Instance '%s' not known" %
2057                                  self.op.instance_name)
2058
2059     # check bridges existance
2060     _CheckInstanceBridgesExist(instance)
2061
2062     self.instance = instance
2063     self.op.instance_name = instance.name
2064
2065   def Exec(self, feedback_fn):
2066     """Reboot the instance.
2067
2068     """
2069     instance = self.instance
2070     ignore_secondaries = self.op.ignore_secondaries
2071     reboot_type = self.op.reboot_type
2072     extra_args = getattr(self.op, "extra_args", "")
2073
2074     node_current = instance.primary_node
2075
2076     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2077                            constants.INSTANCE_REBOOT_HARD,
2078                            constants.INSTANCE_REBOOT_FULL]:
2079       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2080                                   (constants.INSTANCE_REBOOT_SOFT,
2081                                    constants.INSTANCE_REBOOT_HARD,
2082                                    constants.INSTANCE_REBOOT_FULL))
2083
2084     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2085                        constants.INSTANCE_REBOOT_HARD]:
2086       if not rpc.call_instance_reboot(node_current, instance,
2087                                       reboot_type, extra_args):
2088         raise errors.OpExecError("Could not reboot instance")
2089     else:
2090       if not rpc.call_instance_shutdown(node_current, instance):
2091         raise errors.OpExecError("could not shutdown instance for full reboot")
2092       _ShutdownInstanceDisks(instance, self.cfg)
2093       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2094       if not rpc.call_instance_start(node_current, instance, extra_args):
2095         _ShutdownInstanceDisks(instance, self.cfg)
2096         raise errors.OpExecError("Could not start instance for full reboot")
2097
2098     self.cfg.MarkInstanceUp(instance.name)
2099
2100
2101 class LUShutdownInstance(LogicalUnit):
2102   """Shutdown an instance.
2103
2104   """
2105   HPATH = "instance-stop"
2106   HTYPE = constants.HTYPE_INSTANCE
2107   _OP_REQP = ["instance_name"]
2108
2109   def BuildHooksEnv(self):
2110     """Build hooks env.
2111
2112     This runs on master, primary and secondary nodes of the instance.
2113
2114     """
2115     env = _BuildInstanceHookEnvByObject(self.instance)
2116     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2117           list(self.instance.secondary_nodes))
2118     return env, nl, nl
2119
2120   def CheckPrereq(self):
2121     """Check prerequisites.
2122
2123     This checks that the instance is in the cluster.
2124
2125     """
2126     instance = self.cfg.GetInstanceInfo(
2127       self.cfg.ExpandInstanceName(self.op.instance_name))
2128     if instance is None:
2129       raise errors.OpPrereqError("Instance '%s' not known" %
2130                                  self.op.instance_name)
2131     self.instance = instance
2132
2133   def Exec(self, feedback_fn):
2134     """Shutdown the instance.
2135
2136     """
2137     instance = self.instance
2138     node_current = instance.primary_node
2139     self.cfg.MarkInstanceDown(instance.name)
2140     if not rpc.call_instance_shutdown(node_current, instance):
2141       logger.Error("could not shutdown instance")
2142
2143     _ShutdownInstanceDisks(instance, self.cfg)
2144
2145
2146 class LUReinstallInstance(LogicalUnit):
2147   """Reinstall an instance.
2148
2149   """
2150   HPATH = "instance-reinstall"
2151   HTYPE = constants.HTYPE_INSTANCE
2152   _OP_REQP = ["instance_name"]
2153
2154   def BuildHooksEnv(self):
2155     """Build hooks env.
2156
2157     This runs on master, primary and secondary nodes of the instance.
2158
2159     """
2160     env = _BuildInstanceHookEnvByObject(self.instance)
2161     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2162           list(self.instance.secondary_nodes))
2163     return env, nl, nl
2164
2165   def CheckPrereq(self):
2166     """Check prerequisites.
2167
2168     This checks that the instance is in the cluster and is not running.
2169
2170     """
2171     instance = self.cfg.GetInstanceInfo(
2172       self.cfg.ExpandInstanceName(self.op.instance_name))
2173     if instance is None:
2174       raise errors.OpPrereqError("Instance '%s' not known" %
2175                                  self.op.instance_name)
2176     if instance.disk_template == constants.DT_DISKLESS:
2177       raise errors.OpPrereqError("Instance '%s' has no disks" %
2178                                  self.op.instance_name)
2179     if instance.status != "down":
2180       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2181                                  self.op.instance_name)
2182     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2183     if remote_info:
2184       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2185                                  (self.op.instance_name,
2186                                   instance.primary_node))
2187
2188     self.op.os_type = getattr(self.op, "os_type", None)
2189     if self.op.os_type is not None:
2190       # OS verification
2191       pnode = self.cfg.GetNodeInfo(
2192         self.cfg.ExpandNodeName(instance.primary_node))
2193       if pnode is None:
2194         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2195                                    self.op.pnode)
2196       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2197       if not os_obj:
2198         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2199                                    " primary node"  % self.op.os_type)
2200
2201     self.instance = instance
2202
2203   def Exec(self, feedback_fn):
2204     """Reinstall the instance.
2205
2206     """
2207     inst = self.instance
2208
2209     if self.op.os_type is not None:
2210       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2211       inst.os = self.op.os_type
2212       self.cfg.AddInstance(inst)
2213
2214     _StartInstanceDisks(self.cfg, inst, None)
2215     try:
2216       feedback_fn("Running the instance OS create scripts...")
2217       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2218         raise errors.OpExecError("Could not install OS for instance %s"
2219                                  " on node %s" %
2220                                  (inst.name, inst.primary_node))
2221     finally:
2222       _ShutdownInstanceDisks(inst, self.cfg)
2223
2224
2225 class LURenameInstance(LogicalUnit):
2226   """Rename an instance.
2227
2228   """
2229   HPATH = "instance-rename"
2230   HTYPE = constants.HTYPE_INSTANCE
2231   _OP_REQP = ["instance_name", "new_name"]
2232
2233   def BuildHooksEnv(self):
2234     """Build hooks env.
2235
2236     This runs on master, primary and secondary nodes of the instance.
2237
2238     """
2239     env = _BuildInstanceHookEnvByObject(self.instance)
2240     env["INSTANCE_NEW_NAME"] = self.op.new_name
2241     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2242           list(self.instance.secondary_nodes))
2243     return env, nl, nl
2244
2245   def CheckPrereq(self):
2246     """Check prerequisites.
2247
2248     This checks that the instance is in the cluster and is not running.
2249
2250     """
2251     instance = self.cfg.GetInstanceInfo(
2252       self.cfg.ExpandInstanceName(self.op.instance_name))
2253     if instance is None:
2254       raise errors.OpPrereqError("Instance '%s' not known" %
2255                                  self.op.instance_name)
2256     if instance.status != "down":
2257       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2258                                  self.op.instance_name)
2259     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2260     if remote_info:
2261       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2262                                  (self.op.instance_name,
2263                                   instance.primary_node))
2264     self.instance = instance
2265
2266     # new name verification
2267     name_info = utils.HostInfo(self.op.new_name)
2268
2269     self.op.new_name = new_name = name_info.name
2270     instance_list = self.cfg.GetInstanceList()
2271     if new_name in instance_list:
2272       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2273                                  instance_name)
2274
2275     if not getattr(self.op, "ignore_ip", False):
2276       command = ["fping", "-q", name_info.ip]
2277       result = utils.RunCmd(command)
2278       if not result.failed:
2279         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2280                                    (name_info.ip, new_name))
2281
2282
2283   def Exec(self, feedback_fn):
2284     """Reinstall the instance.
2285
2286     """
2287     inst = self.instance
2288     old_name = inst.name
2289
2290     self.cfg.RenameInstance(inst.name, self.op.new_name)
2291
2292     # re-read the instance from the configuration after rename
2293     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2294
2295     _StartInstanceDisks(self.cfg, inst, None)
2296     try:
2297       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2298                                           "sda", "sdb"):
2299         msg = ("Could run OS rename script for instance %s on node %s (but the"
2300                " instance has been renamed in Ganeti)" %
2301                (inst.name, inst.primary_node))
2302         logger.Error(msg)
2303     finally:
2304       _ShutdownInstanceDisks(inst, self.cfg)
2305
2306
2307 class LURemoveInstance(LogicalUnit):
2308   """Remove an instance.
2309
2310   """
2311   HPATH = "instance-remove"
2312   HTYPE = constants.HTYPE_INSTANCE
2313   _OP_REQP = ["instance_name"]
2314
2315   def BuildHooksEnv(self):
2316     """Build hooks env.
2317
2318     This runs on master, primary and secondary nodes of the instance.
2319
2320     """
2321     env = _BuildInstanceHookEnvByObject(self.instance)
2322     nl = [self.sstore.GetMasterNode()]
2323     return env, nl, nl
2324
2325   def CheckPrereq(self):
2326     """Check prerequisites.
2327
2328     This checks that the instance is in the cluster.
2329
2330     """
2331     instance = self.cfg.GetInstanceInfo(
2332       self.cfg.ExpandInstanceName(self.op.instance_name))
2333     if instance is None:
2334       raise errors.OpPrereqError("Instance '%s' not known" %
2335                                  self.op.instance_name)
2336     self.instance = instance
2337
2338   def Exec(self, feedback_fn):
2339     """Remove the instance.
2340
2341     """
2342     instance = self.instance
2343     logger.Info("shutting down instance %s on node %s" %
2344                 (instance.name, instance.primary_node))
2345
2346     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2347       if self.op.ignore_failures:
2348         feedback_fn("Warning: can't shutdown instance")
2349       else:
2350         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2351                                  (instance.name, instance.primary_node))
2352
2353     logger.Info("removing block devices for instance %s" % instance.name)
2354
2355     if not _RemoveDisks(instance, self.cfg):
2356       if self.op.ignore_failures:
2357         feedback_fn("Warning: can't remove instance's disks")
2358       else:
2359         raise errors.OpExecError("Can't remove instance's disks")
2360
2361     logger.Info("removing instance %s out of cluster config" % instance.name)
2362
2363     self.cfg.RemoveInstance(instance.name)
2364
2365
2366 class LUQueryInstances(NoHooksLU):
2367   """Logical unit for querying instances.
2368
2369   """
2370   _OP_REQP = ["output_fields", "names"]
2371
2372   def CheckPrereq(self):
2373     """Check prerequisites.
2374
2375     This checks that the fields required are valid output fields.
2376
2377     """
2378     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2379     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2380                                "admin_state", "admin_ram",
2381                                "disk_template", "ip", "mac", "bridge",
2382                                "sda_size", "sdb_size", "vcpus"],
2383                        dynamic=self.dynamic_fields,
2384                        selected=self.op.output_fields)
2385
2386     self.wanted = _GetWantedInstances(self, self.op.names)
2387
2388   def Exec(self, feedback_fn):
2389     """Computes the list of nodes and their attributes.
2390
2391     """
2392     instance_names = self.wanted
2393     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2394                      in instance_names]
2395
2396     # begin data gathering
2397
2398     nodes = frozenset([inst.primary_node for inst in instance_list])
2399
2400     bad_nodes = []
2401     if self.dynamic_fields.intersection(self.op.output_fields):
2402       live_data = {}
2403       node_data = rpc.call_all_instances_info(nodes)
2404       for name in nodes:
2405         result = node_data[name]
2406         if result:
2407           live_data.update(result)
2408         elif result == False:
2409           bad_nodes.append(name)
2410         # else no instance is alive
2411     else:
2412       live_data = dict([(name, {}) for name in instance_names])
2413
2414     # end data gathering
2415
2416     output = []
2417     for instance in instance_list:
2418       iout = []
2419       for field in self.op.output_fields:
2420         if field == "name":
2421           val = instance.name
2422         elif field == "os":
2423           val = instance.os
2424         elif field == "pnode":
2425           val = instance.primary_node
2426         elif field == "snodes":
2427           val = list(instance.secondary_nodes)
2428         elif field == "admin_state":
2429           val = (instance.status != "down")
2430         elif field == "oper_state":
2431           if instance.primary_node in bad_nodes:
2432             val = None
2433           else:
2434             val = bool(live_data.get(instance.name))
2435         elif field == "status":
2436           if instance.primary_node in bad_nodes:
2437             val = "ERROR_nodedown"
2438           else:
2439             running = bool(live_data.get(instance.name))
2440             if running:
2441               if instance.status != "down":
2442                 val = "running"
2443               else:
2444                 val = "ERROR_up"
2445             else:
2446               if instance.status != "down":
2447                 val = "ERROR_down"
2448               else:
2449                 val = "ADMIN_down"
2450         elif field == "admin_ram":
2451           val = instance.memory
2452         elif field == "oper_ram":
2453           if instance.primary_node in bad_nodes:
2454             val = None
2455           elif instance.name in live_data:
2456             val = live_data[instance.name].get("memory", "?")
2457           else:
2458             val = "-"
2459         elif field == "disk_template":
2460           val = instance.disk_template
2461         elif field == "ip":
2462           val = instance.nics[0].ip
2463         elif field == "bridge":
2464           val = instance.nics[0].bridge
2465         elif field == "mac":
2466           val = instance.nics[0].mac
2467         elif field == "sda_size" or field == "sdb_size":
2468           disk = instance.FindDisk(field[:3])
2469           if disk is None:
2470             val = None
2471           else:
2472             val = disk.size
2473         elif field == "vcpus":
2474           val = instance.vcpus
2475         else:
2476           raise errors.ParameterError(field)
2477         iout.append(val)
2478       output.append(iout)
2479
2480     return output
2481
2482
2483 class LUFailoverInstance(LogicalUnit):
2484   """Failover an instance.
2485
2486   """
2487   HPATH = "instance-failover"
2488   HTYPE = constants.HTYPE_INSTANCE
2489   _OP_REQP = ["instance_name", "ignore_consistency"]
2490
2491   def BuildHooksEnv(self):
2492     """Build hooks env.
2493
2494     This runs on master, primary and secondary nodes of the instance.
2495
2496     """
2497     env = {
2498       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2499       }
2500     env.update(_BuildInstanceHookEnvByObject(self.instance))
2501     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2502     return env, nl, nl
2503
2504   def CheckPrereq(self):
2505     """Check prerequisites.
2506
2507     This checks that the instance is in the cluster.
2508
2509     """
2510     instance = self.cfg.GetInstanceInfo(
2511       self.cfg.ExpandInstanceName(self.op.instance_name))
2512     if instance is None:
2513       raise errors.OpPrereqError("Instance '%s' not known" %
2514                                  self.op.instance_name)
2515
2516     if instance.disk_template not in constants.DTS_NET_MIRROR:
2517       raise errors.OpPrereqError("Instance's disk layout is not"
2518                                  " network mirrored, cannot failover.")
2519
2520     secondary_nodes = instance.secondary_nodes
2521     if not secondary_nodes:
2522       raise errors.ProgrammerError("no secondary node but using "
2523                                    "DT_REMOTE_RAID1 template")
2524
2525     target_node = secondary_nodes[0]
2526     # check memory requirements on the secondary node
2527     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2528                          instance.name, instance.memory)
2529
2530     # check bridge existance
2531     brlist = [nic.bridge for nic in instance.nics]
2532     if not rpc.call_bridges_exist(target_node, brlist):
2533       raise errors.OpPrereqError("One or more target bridges %s does not"
2534                                  " exist on destination node '%s'" %
2535                                  (brlist, target_node))
2536
2537     self.instance = instance
2538
2539   def Exec(self, feedback_fn):
2540     """Failover an instance.
2541
2542     The failover is done by shutting it down on its present node and
2543     starting it on the secondary.
2544
2545     """
2546     instance = self.instance
2547
2548     source_node = instance.primary_node
2549     target_node = instance.secondary_nodes[0]
2550
2551     feedback_fn("* checking disk consistency between source and target")
2552     for dev in instance.disks:
2553       # for remote_raid1, these are md over drbd
2554       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2555         if not self.op.ignore_consistency:
2556           raise errors.OpExecError("Disk %s is degraded on target node,"
2557                                    " aborting failover." % dev.iv_name)
2558
2559     feedback_fn("* shutting down instance on source node")
2560     logger.Info("Shutting down instance %s on node %s" %
2561                 (instance.name, source_node))
2562
2563     if not rpc.call_instance_shutdown(source_node, instance):
2564       if self.op.ignore_consistency:
2565         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2566                      " anyway. Please make sure node %s is down"  %
2567                      (instance.name, source_node, source_node))
2568       else:
2569         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2570                                  (instance.name, source_node))
2571
2572     feedback_fn("* deactivating the instance's disks on source node")
2573     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2574       raise errors.OpExecError("Can't shut down the instance's disks.")
2575
2576     instance.primary_node = target_node
2577     # distribute new instance config to the other nodes
2578     self.cfg.AddInstance(instance)
2579
2580     feedback_fn("* activating the instance's disks on target node")
2581     logger.Info("Starting instance %s on node %s" %
2582                 (instance.name, target_node))
2583
2584     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2585                                              ignore_secondaries=True)
2586     if not disks_ok:
2587       _ShutdownInstanceDisks(instance, self.cfg)
2588       raise errors.OpExecError("Can't activate the instance's disks")
2589
2590     feedback_fn("* starting the instance on the target node")
2591     if not rpc.call_instance_start(target_node, instance, None):
2592       _ShutdownInstanceDisks(instance, self.cfg)
2593       raise errors.OpExecError("Could not start instance %s on node %s." %
2594                                (instance.name, target_node))
2595
2596
2597 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2598   """Create a tree of block devices on the primary node.
2599
2600   This always creates all devices.
2601
2602   """
2603   if device.children:
2604     for child in device.children:
2605       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2606         return False
2607
2608   cfg.SetDiskID(device, node)
2609   new_id = rpc.call_blockdev_create(node, device, device.size,
2610                                     instance.name, True, info)
2611   if not new_id:
2612     return False
2613   if device.physical_id is None:
2614     device.physical_id = new_id
2615   return True
2616
2617
2618 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2619   """Create a tree of block devices on a secondary node.
2620
2621   If this device type has to be created on secondaries, create it and
2622   all its children.
2623
2624   If not, just recurse to children keeping the same 'force' value.
2625
2626   """
2627   if device.CreateOnSecondary():
2628     force = True
2629   if device.children:
2630     for child in device.children:
2631       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2632                                         child, force, info):
2633         return False
2634
2635   if not force:
2636     return True
2637   cfg.SetDiskID(device, node)
2638   new_id = rpc.call_blockdev_create(node, device, device.size,
2639                                     instance.name, False, info)
2640   if not new_id:
2641     return False
2642   if device.physical_id is None:
2643     device.physical_id = new_id
2644   return True
2645
2646
2647 def _GenerateUniqueNames(cfg, exts):
2648   """Generate a suitable LV name.
2649
2650   This will generate a logical volume name for the given instance.
2651
2652   """
2653   results = []
2654   for val in exts:
2655     new_id = cfg.GenerateUniqueID()
2656     results.append("%s%s" % (new_id, val))
2657   return results
2658
2659
2660 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2661   """Generate a drbd device complete with its children.
2662
2663   """
2664   port = cfg.AllocatePort()
2665   vgname = cfg.GetVGName()
2666   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2667                           logical_id=(vgname, names[0]))
2668   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2669                           logical_id=(vgname, names[1]))
2670   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2671                           logical_id = (primary, secondary, port),
2672                           children = [dev_data, dev_meta])
2673   return drbd_dev
2674
2675
2676 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2677   """Generate a drbd8 device complete with its children.
2678
2679   """
2680   port = cfg.AllocatePort()
2681   vgname = cfg.GetVGName()
2682   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2683                           logical_id=(vgname, names[0]))
2684   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2685                           logical_id=(vgname, names[1]))
2686   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2687                           logical_id = (primary, secondary, port),
2688                           children = [dev_data, dev_meta],
2689                           iv_name=iv_name)
2690   return drbd_dev
2691
2692
2693 def _GenerateDiskTemplate(cfg, template_name,
2694                           instance_name, primary_node,
2695                           secondary_nodes, disk_sz, swap_sz):
2696   """Generate the entire disk layout for a given template type.
2697
2698   """
2699   #TODO: compute space requirements
2700
2701   vgname = cfg.GetVGName()
2702   if template_name == constants.DT_DISKLESS:
2703     disks = []
2704   elif template_name == constants.DT_PLAIN:
2705     if len(secondary_nodes) != 0:
2706       raise errors.ProgrammerError("Wrong template configuration")
2707
2708     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2709     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2710                            logical_id=(vgname, names[0]),
2711                            iv_name = "sda")
2712     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2713                            logical_id=(vgname, names[1]),
2714                            iv_name = "sdb")
2715     disks = [sda_dev, sdb_dev]
2716   elif template_name == constants.DT_DRBD8:
2717     if len(secondary_nodes) != 1:
2718       raise errors.ProgrammerError("Wrong template configuration")
2719     remote_node = secondary_nodes[0]
2720     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2721                                        ".sdb_data", ".sdb_meta"])
2722     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2723                                          disk_sz, names[0:2], "sda")
2724     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2725                                          swap_sz, names[2:4], "sdb")
2726     disks = [drbd_sda_dev, drbd_sdb_dev]
2727   else:
2728     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2729   return disks
2730
2731
2732 def _GetInstanceInfoText(instance):
2733   """Compute that text that should be added to the disk's metadata.
2734
2735   """
2736   return "originstname+%s" % instance.name
2737
2738
2739 def _CreateDisks(cfg, instance):
2740   """Create all disks for an instance.
2741
2742   This abstracts away some work from AddInstance.
2743
2744   Args:
2745     instance: the instance object
2746
2747   Returns:
2748     True or False showing the success of the creation process
2749
2750   """
2751   info = _GetInstanceInfoText(instance)
2752
2753   for device in instance.disks:
2754     logger.Info("creating volume %s for instance %s" %
2755               (device.iv_name, instance.name))
2756     #HARDCODE
2757     for secondary_node in instance.secondary_nodes:
2758       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2759                                         device, False, info):
2760         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2761                      (device.iv_name, device, secondary_node))
2762         return False
2763     #HARDCODE
2764     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2765                                     instance, device, info):
2766       logger.Error("failed to create volume %s on primary!" %
2767                    device.iv_name)
2768       return False
2769   return True
2770
2771
2772 def _RemoveDisks(instance, cfg):
2773   """Remove all disks for an instance.
2774
2775   This abstracts away some work from `AddInstance()` and
2776   `RemoveInstance()`. Note that in case some of the devices couldn't
2777   be removed, the removal will continue with the other ones (compare
2778   with `_CreateDisks()`).
2779
2780   Args:
2781     instance: the instance object
2782
2783   Returns:
2784     True or False showing the success of the removal proces
2785
2786   """
2787   logger.Info("removing block devices for instance %s" % instance.name)
2788
2789   result = True
2790   for device in instance.disks:
2791     for node, disk in device.ComputeNodeTree(instance.primary_node):
2792       cfg.SetDiskID(disk, node)
2793       if not rpc.call_blockdev_remove(node, disk):
2794         logger.Error("could not remove block device %s on node %s,"
2795                      " continuing anyway" %
2796                      (device.iv_name, node))
2797         result = False
2798   return result
2799
2800
2801 class LUCreateInstance(LogicalUnit):
2802   """Create an instance.
2803
2804   """
2805   HPATH = "instance-add"
2806   HTYPE = constants.HTYPE_INSTANCE
2807   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2808               "disk_template", "swap_size", "mode", "start", "vcpus",
2809               "wait_for_sync", "ip_check", "mac"]
2810
2811   def BuildHooksEnv(self):
2812     """Build hooks env.
2813
2814     This runs on master, primary and secondary nodes of the instance.
2815
2816     """
2817     env = {
2818       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2819       "INSTANCE_DISK_SIZE": self.op.disk_size,
2820       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2821       "INSTANCE_ADD_MODE": self.op.mode,
2822       }
2823     if self.op.mode == constants.INSTANCE_IMPORT:
2824       env["INSTANCE_SRC_NODE"] = self.op.src_node
2825       env["INSTANCE_SRC_PATH"] = self.op.src_path
2826       env["INSTANCE_SRC_IMAGE"] = self.src_image
2827
2828     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2829       primary_node=self.op.pnode,
2830       secondary_nodes=self.secondaries,
2831       status=self.instance_status,
2832       os_type=self.op.os_type,
2833       memory=self.op.mem_size,
2834       vcpus=self.op.vcpus,
2835       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2836     ))
2837
2838     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2839           self.secondaries)
2840     return env, nl, nl
2841
2842
2843   def CheckPrereq(self):
2844     """Check prerequisites.
2845
2846     """
2847     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2848       if not hasattr(self.op, attr):
2849         setattr(self.op, attr, None)
2850
2851     if self.op.mode not in (constants.INSTANCE_CREATE,
2852                             constants.INSTANCE_IMPORT):
2853       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2854                                  self.op.mode)
2855
2856     if self.op.mode == constants.INSTANCE_IMPORT:
2857       src_node = getattr(self.op, "src_node", None)
2858       src_path = getattr(self.op, "src_path", None)
2859       if src_node is None or src_path is None:
2860         raise errors.OpPrereqError("Importing an instance requires source"
2861                                    " node and path options")
2862       src_node_full = self.cfg.ExpandNodeName(src_node)
2863       if src_node_full is None:
2864         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2865       self.op.src_node = src_node = src_node_full
2866
2867       if not os.path.isabs(src_path):
2868         raise errors.OpPrereqError("The source path must be absolute")
2869
2870       export_info = rpc.call_export_info(src_node, src_path)
2871
2872       if not export_info:
2873         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2874
2875       if not export_info.has_section(constants.INISECT_EXP):
2876         raise errors.ProgrammerError("Corrupted export config")
2877
2878       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2879       if (int(ei_version) != constants.EXPORT_VERSION):
2880         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2881                                    (ei_version, constants.EXPORT_VERSION))
2882
2883       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2884         raise errors.OpPrereqError("Can't import instance with more than"
2885                                    " one data disk")
2886
2887       # FIXME: are the old os-es, disk sizes, etc. useful?
2888       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2889       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2890                                                          'disk0_dump'))
2891       self.src_image = diskimage
2892     else: # INSTANCE_CREATE
2893       if getattr(self.op, "os_type", None) is None:
2894         raise errors.OpPrereqError("No guest OS specified")
2895
2896     # check primary node
2897     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2898     if pnode is None:
2899       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2900                                  self.op.pnode)
2901     self.op.pnode = pnode.name
2902     self.pnode = pnode
2903     self.secondaries = []
2904     # disk template and mirror node verification
2905     if self.op.disk_template not in constants.DISK_TEMPLATES:
2906       raise errors.OpPrereqError("Invalid disk template name")
2907
2908     if self.op.disk_template in constants.DTS_NET_MIRROR:
2909       if getattr(self.op, "snode", None) is None:
2910         raise errors.OpPrereqError("The networked disk templates need"
2911                                    " a mirror node")
2912
2913       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2914       if snode_name is None:
2915         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2916                                    self.op.snode)
2917       elif snode_name == pnode.name:
2918         raise errors.OpPrereqError("The secondary node cannot be"
2919                                    " the primary node.")
2920       self.secondaries.append(snode_name)
2921
2922     # Required free disk space as a function of disk and swap space
2923     req_size_dict = {
2924       constants.DT_DISKLESS: None,
2925       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2926       # 256 MB are added for drbd metadata, 128MB for each drbd device
2927       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2928     }
2929
2930     if self.op.disk_template not in req_size_dict:
2931       raise errors.ProgrammerError("Disk template '%s' size requirement"
2932                                    " is unknown" %  self.op.disk_template)
2933
2934     req_size = req_size_dict[self.op.disk_template]
2935
2936     # Check lv size requirements
2937     if req_size is not None:
2938       nodenames = [pnode.name] + self.secondaries
2939       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2940       for node in nodenames:
2941         info = nodeinfo.get(node, None)
2942         if not info:
2943           raise errors.OpPrereqError("Cannot get current information"
2944                                      " from node '%s'" % nodeinfo)
2945         vg_free = info.get('vg_free', None)
2946         if not isinstance(vg_free, int):
2947           raise errors.OpPrereqError("Can't compute free disk space on"
2948                                      " node %s" % node)
2949         if req_size > info['vg_free']:
2950           raise errors.OpPrereqError("Not enough disk space on target node %s."
2951                                      " %d MB available, %d MB required" %
2952                                      (node, info['vg_free'], req_size))
2953
2954     # os verification
2955     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2956     if not os_obj:
2957       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2958                                  " primary node"  % self.op.os_type)
2959
2960     if self.op.kernel_path == constants.VALUE_NONE:
2961       raise errors.OpPrereqError("Can't set instance kernel to none")
2962
2963     # instance verification
2964     hostname1 = utils.HostInfo(self.op.instance_name)
2965
2966     self.op.instance_name = instance_name = hostname1.name
2967     instance_list = self.cfg.GetInstanceList()
2968     if instance_name in instance_list:
2969       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2970                                  instance_name)
2971
2972     ip = getattr(self.op, "ip", None)
2973     if ip is None or ip.lower() == "none":
2974       inst_ip = None
2975     elif ip.lower() == "auto":
2976       inst_ip = hostname1.ip
2977     else:
2978       if not utils.IsValidIP(ip):
2979         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2980                                    " like a valid IP" % ip)
2981       inst_ip = ip
2982     self.inst_ip = inst_ip
2983
2984     if self.op.start and not self.op.ip_check:
2985       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2986                                  " adding an instance in start mode")
2987
2988     if self.op.ip_check:
2989       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
2990         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2991                                    (hostname1.ip, instance_name))
2992
2993     # MAC address verification
2994     if self.op.mac != "auto":
2995       if not utils.IsValidMac(self.op.mac.lower()):
2996         raise errors.OpPrereqError("invalid MAC address specified: %s" %
2997                                    self.op.mac)
2998
2999     # bridge verification
3000     bridge = getattr(self.op, "bridge", None)
3001     if bridge is None:
3002       self.op.bridge = self.cfg.GetDefBridge()
3003     else:
3004       self.op.bridge = bridge
3005
3006     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3007       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3008                                  " destination node '%s'" %
3009                                  (self.op.bridge, pnode.name))
3010
3011     # boot order verification
3012     if self.op.hvm_boot_order is not None:
3013       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3014         raise errors.OpPrereqError("invalid boot order specified,"
3015                                    " must be one or more of [acdn]")
3016
3017     if self.op.start:
3018       self.instance_status = 'up'
3019     else:
3020       self.instance_status = 'down'
3021
3022   def Exec(self, feedback_fn):
3023     """Create and add the instance to the cluster.
3024
3025     """
3026     instance = self.op.instance_name
3027     pnode_name = self.pnode.name
3028
3029     if self.op.mac == "auto":
3030       mac_address = self.cfg.GenerateMAC()
3031     else:
3032       mac_address = self.op.mac
3033
3034     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3035     if self.inst_ip is not None:
3036       nic.ip = self.inst_ip
3037
3038     ht_kind = self.sstore.GetHypervisorType()
3039     if ht_kind in constants.HTS_REQ_PORT:
3040       network_port = self.cfg.AllocatePort()
3041     else:
3042       network_port = None
3043
3044     disks = _GenerateDiskTemplate(self.cfg,
3045                                   self.op.disk_template,
3046                                   instance, pnode_name,
3047                                   self.secondaries, self.op.disk_size,
3048                                   self.op.swap_size)
3049
3050     iobj = objects.Instance(name=instance, os=self.op.os_type,
3051                             primary_node=pnode_name,
3052                             memory=self.op.mem_size,
3053                             vcpus=self.op.vcpus,
3054                             nics=[nic], disks=disks,
3055                             disk_template=self.op.disk_template,
3056                             status=self.instance_status,
3057                             network_port=network_port,
3058                             kernel_path=self.op.kernel_path,
3059                             initrd_path=self.op.initrd_path,
3060                             hvm_boot_order=self.op.hvm_boot_order,
3061                             )
3062
3063     feedback_fn("* creating instance disks...")
3064     if not _CreateDisks(self.cfg, iobj):
3065       _RemoveDisks(iobj, self.cfg)
3066       raise errors.OpExecError("Device creation failed, reverting...")
3067
3068     feedback_fn("adding instance %s to cluster config" % instance)
3069
3070     self.cfg.AddInstance(iobj)
3071
3072     if self.op.wait_for_sync:
3073       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3074     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3075       # make sure the disks are not degraded (still sync-ing is ok)
3076       time.sleep(15)
3077       feedback_fn("* checking mirrors status")
3078       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3079     else:
3080       disk_abort = False
3081
3082     if disk_abort:
3083       _RemoveDisks(iobj, self.cfg)
3084       self.cfg.RemoveInstance(iobj.name)
3085       raise errors.OpExecError("There are some degraded disks for"
3086                                " this instance")
3087
3088     feedback_fn("creating os for instance %s on node %s" %
3089                 (instance, pnode_name))
3090
3091     if iobj.disk_template != constants.DT_DISKLESS:
3092       if self.op.mode == constants.INSTANCE_CREATE:
3093         feedback_fn("* running the instance OS create scripts...")
3094         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3095           raise errors.OpExecError("could not add os for instance %s"
3096                                    " on node %s" %
3097                                    (instance, pnode_name))
3098
3099       elif self.op.mode == constants.INSTANCE_IMPORT:
3100         feedback_fn("* running the instance OS import scripts...")
3101         src_node = self.op.src_node
3102         src_image = self.src_image
3103         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3104                                                 src_node, src_image):
3105           raise errors.OpExecError("Could not import os for instance"
3106                                    " %s on node %s" %
3107                                    (instance, pnode_name))
3108       else:
3109         # also checked in the prereq part
3110         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3111                                      % self.op.mode)
3112
3113     if self.op.start:
3114       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3115       feedback_fn("* starting instance...")
3116       if not rpc.call_instance_start(pnode_name, iobj, None):
3117         raise errors.OpExecError("Could not start instance")
3118
3119
3120 class LUConnectConsole(NoHooksLU):
3121   """Connect to an instance's console.
3122
3123   This is somewhat special in that it returns the command line that
3124   you need to run on the master node in order to connect to the
3125   console.
3126
3127   """
3128   _OP_REQP = ["instance_name"]
3129
3130   def CheckPrereq(self):
3131     """Check prerequisites.
3132
3133     This checks that the instance is in the cluster.
3134
3135     """
3136     instance = self.cfg.GetInstanceInfo(
3137       self.cfg.ExpandInstanceName(self.op.instance_name))
3138     if instance is None:
3139       raise errors.OpPrereqError("Instance '%s' not known" %
3140                                  self.op.instance_name)
3141     self.instance = instance
3142
3143   def Exec(self, feedback_fn):
3144     """Connect to the console of an instance
3145
3146     """
3147     instance = self.instance
3148     node = instance.primary_node
3149
3150     node_insts = rpc.call_instance_list([node])[node]
3151     if node_insts is False:
3152       raise errors.OpExecError("Can't connect to node %s." % node)
3153
3154     if instance.name not in node_insts:
3155       raise errors.OpExecError("Instance %s is not running." % instance.name)
3156
3157     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3158
3159     hyper = hypervisor.GetHypervisor()
3160     console_cmd = hyper.GetShellCommandForConsole(instance)
3161
3162     # build ssh cmdline
3163     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3164
3165
3166 class LUReplaceDisks(LogicalUnit):
3167   """Replace the disks of an instance.
3168
3169   """
3170   HPATH = "mirrors-replace"
3171   HTYPE = constants.HTYPE_INSTANCE
3172   _OP_REQP = ["instance_name", "mode", "disks"]
3173
3174   def BuildHooksEnv(self):
3175     """Build hooks env.
3176
3177     This runs on the master, the primary and all the secondaries.
3178
3179     """
3180     env = {
3181       "MODE": self.op.mode,
3182       "NEW_SECONDARY": self.op.remote_node,
3183       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3184       }
3185     env.update(_BuildInstanceHookEnvByObject(self.instance))
3186     nl = [
3187       self.sstore.GetMasterNode(),
3188       self.instance.primary_node,
3189       ]
3190     if self.op.remote_node is not None:
3191       nl.append(self.op.remote_node)
3192     return env, nl, nl
3193
3194   def CheckPrereq(self):
3195     """Check prerequisites.
3196
3197     This checks that the instance is in the cluster.
3198
3199     """
3200     instance = self.cfg.GetInstanceInfo(
3201       self.cfg.ExpandInstanceName(self.op.instance_name))
3202     if instance is None:
3203       raise errors.OpPrereqError("Instance '%s' not known" %
3204                                  self.op.instance_name)
3205     self.instance = instance
3206     self.op.instance_name = instance.name
3207
3208     if instance.disk_template not in constants.DTS_NET_MIRROR:
3209       raise errors.OpPrereqError("Instance's disk layout is not"
3210                                  " network mirrored.")
3211
3212     if len(instance.secondary_nodes) != 1:
3213       raise errors.OpPrereqError("The instance has a strange layout,"
3214                                  " expected one secondary but found %d" %
3215                                  len(instance.secondary_nodes))
3216
3217     self.sec_node = instance.secondary_nodes[0]
3218
3219     remote_node = getattr(self.op, "remote_node", None)
3220     if remote_node is not None:
3221       remote_node = self.cfg.ExpandNodeName(remote_node)
3222       if remote_node is None:
3223         raise errors.OpPrereqError("Node '%s' not known" %
3224                                    self.op.remote_node)
3225       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3226     else:
3227       self.remote_node_info = None
3228     if remote_node == instance.primary_node:
3229       raise errors.OpPrereqError("The specified node is the primary node of"
3230                                  " the instance.")
3231     elif remote_node == self.sec_node:
3232       if self.op.mode == constants.REPLACE_DISK_SEC:
3233         # this is for DRBD8, where we can't execute the same mode of
3234         # replacement as for drbd7 (no different port allocated)
3235         raise errors.OpPrereqError("Same secondary given, cannot execute"
3236                                    " replacement")
3237       # the user gave the current secondary, switch to
3238       # 'no-replace-secondary' mode for drbd7
3239       remote_node = None
3240     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3241         self.op.mode != constants.REPLACE_DISK_ALL):
3242       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3243                                  " disks replacement, not individual ones")
3244     if instance.disk_template == constants.DT_DRBD8:
3245       if (self.op.mode == constants.REPLACE_DISK_ALL and
3246           remote_node is not None):
3247         # switch to replace secondary mode
3248         self.op.mode = constants.REPLACE_DISK_SEC
3249
3250       if self.op.mode == constants.REPLACE_DISK_ALL:
3251         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3252                                    " secondary disk replacement, not"
3253                                    " both at once")
3254       elif self.op.mode == constants.REPLACE_DISK_PRI:
3255         if remote_node is not None:
3256           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3257                                      " the secondary while doing a primary"
3258                                      " node disk replacement")
3259         self.tgt_node = instance.primary_node
3260         self.oth_node = instance.secondary_nodes[0]
3261       elif self.op.mode == constants.REPLACE_DISK_SEC:
3262         self.new_node = remote_node # this can be None, in which case
3263                                     # we don't change the secondary
3264         self.tgt_node = instance.secondary_nodes[0]
3265         self.oth_node = instance.primary_node
3266       else:
3267         raise errors.ProgrammerError("Unhandled disk replace mode")
3268
3269     for name in self.op.disks:
3270       if instance.FindDisk(name) is None:
3271         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3272                                    (name, instance.name))
3273     self.op.remote_node = remote_node
3274
3275   def _ExecRR1(self, feedback_fn):
3276     """Replace the disks of an instance.
3277
3278     """
3279     instance = self.instance
3280     iv_names = {}
3281     # start of work
3282     if self.op.remote_node is None:
3283       remote_node = self.sec_node
3284     else:
3285       remote_node = self.op.remote_node
3286     cfg = self.cfg
3287     for dev in instance.disks:
3288       size = dev.size
3289       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3290       names = _GenerateUniqueNames(cfg, lv_names)
3291       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3292                                        remote_node, size, names)
3293       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3294       logger.Info("adding new mirror component on secondary for %s" %
3295                   dev.iv_name)
3296       #HARDCODE
3297       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3298                                         new_drbd, False,
3299                                         _GetInstanceInfoText(instance)):
3300         raise errors.OpExecError("Failed to create new component on secondary"
3301                                  " node %s. Full abort, cleanup manually!" %
3302                                  remote_node)
3303
3304       logger.Info("adding new mirror component on primary")
3305       #HARDCODE
3306       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3307                                       instance, new_drbd,
3308                                       _GetInstanceInfoText(instance)):
3309         # remove secondary dev
3310         cfg.SetDiskID(new_drbd, remote_node)
3311         rpc.call_blockdev_remove(remote_node, new_drbd)
3312         raise errors.OpExecError("Failed to create volume on primary!"
3313                                  " Full abort, cleanup manually!!")
3314
3315       # the device exists now
3316       # call the primary node to add the mirror to md
3317       logger.Info("adding new mirror component to md")
3318       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3319                                            [new_drbd]):
3320         logger.Error("Can't add mirror compoment to md!")
3321         cfg.SetDiskID(new_drbd, remote_node)
3322         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3323           logger.Error("Can't rollback on secondary")
3324         cfg.SetDiskID(new_drbd, instance.primary_node)
3325         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3326           logger.Error("Can't rollback on primary")
3327         raise errors.OpExecError("Full abort, cleanup manually!!")
3328
3329       dev.children.append(new_drbd)
3330       cfg.AddInstance(instance)
3331
3332     # this can fail as the old devices are degraded and _WaitForSync
3333     # does a combined result over all disks, so we don't check its
3334     # return value
3335     _WaitForSync(cfg, instance, self.proc, unlock=True)
3336
3337     # so check manually all the devices
3338     for name in iv_names:
3339       dev, child, new_drbd = iv_names[name]
3340       cfg.SetDiskID(dev, instance.primary_node)
3341       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3342       if is_degr:
3343         raise errors.OpExecError("MD device %s is degraded!" % name)
3344       cfg.SetDiskID(new_drbd, instance.primary_node)
3345       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3346       if is_degr:
3347         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3348
3349     for name in iv_names:
3350       dev, child, new_drbd = iv_names[name]
3351       logger.Info("remove mirror %s component" % name)
3352       cfg.SetDiskID(dev, instance.primary_node)
3353       if not rpc.call_blockdev_removechildren(instance.primary_node,
3354                                               dev, [child]):
3355         logger.Error("Can't remove child from mirror, aborting"
3356                      " *this device cleanup*.\nYou need to cleanup manually!!")
3357         continue
3358
3359       for node in child.logical_id[:2]:
3360         logger.Info("remove child device on %s" % node)
3361         cfg.SetDiskID(child, node)
3362         if not rpc.call_blockdev_remove(node, child):
3363           logger.Error("Warning: failed to remove device from node %s,"
3364                        " continuing operation." % node)
3365
3366       dev.children.remove(child)
3367
3368       cfg.AddInstance(instance)
3369
3370   def _ExecD8DiskOnly(self, feedback_fn):
3371     """Replace a disk on the primary or secondary for dbrd8.
3372
3373     The algorithm for replace is quite complicated:
3374       - for each disk to be replaced:
3375         - create new LVs on the target node with unique names
3376         - detach old LVs from the drbd device
3377         - rename old LVs to name_replaced.<time_t>
3378         - rename new LVs to old LVs
3379         - attach the new LVs (with the old names now) to the drbd device
3380       - wait for sync across all devices
3381       - for each modified disk:
3382         - remove old LVs (which have the name name_replaces.<time_t>)
3383
3384     Failures are not very well handled.
3385
3386     """
3387     steps_total = 6
3388     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3389     instance = self.instance
3390     iv_names = {}
3391     vgname = self.cfg.GetVGName()
3392     # start of work
3393     cfg = self.cfg
3394     tgt_node = self.tgt_node
3395     oth_node = self.oth_node
3396
3397     # Step: check device activation
3398     self.proc.LogStep(1, steps_total, "check device existence")
3399     info("checking volume groups")
3400     my_vg = cfg.GetVGName()
3401     results = rpc.call_vg_list([oth_node, tgt_node])
3402     if not results:
3403       raise errors.OpExecError("Can't list volume groups on the nodes")
3404     for node in oth_node, tgt_node:
3405       res = results.get(node, False)
3406       if not res or my_vg not in res:
3407         raise errors.OpExecError("Volume group '%s' not found on %s" %
3408                                  (my_vg, node))
3409     for dev in instance.disks:
3410       if not dev.iv_name in self.op.disks:
3411         continue
3412       for node in tgt_node, oth_node:
3413         info("checking %s on %s" % (dev.iv_name, node))
3414         cfg.SetDiskID(dev, node)
3415         if not rpc.call_blockdev_find(node, dev):
3416           raise errors.OpExecError("Can't find device %s on node %s" %
3417                                    (dev.iv_name, node))
3418
3419     # Step: check other node consistency
3420     self.proc.LogStep(2, steps_total, "check peer consistency")
3421     for dev in instance.disks:
3422       if not dev.iv_name in self.op.disks:
3423         continue
3424       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3425       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3426                                    oth_node==instance.primary_node):
3427         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3428                                  " to replace disks on this node (%s)" %
3429                                  (oth_node, tgt_node))
3430
3431     # Step: create new storage
3432     self.proc.LogStep(3, steps_total, "allocate new storage")
3433     for dev in instance.disks:
3434       if not dev.iv_name in self.op.disks:
3435         continue
3436       size = dev.size
3437       cfg.SetDiskID(dev, tgt_node)
3438       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3439       names = _GenerateUniqueNames(cfg, lv_names)
3440       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3441                              logical_id=(vgname, names[0]))
3442       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3443                              logical_id=(vgname, names[1]))
3444       new_lvs = [lv_data, lv_meta]
3445       old_lvs = dev.children
3446       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3447       info("creating new local storage on %s for %s" %
3448            (tgt_node, dev.iv_name))
3449       # since we *always* want to create this LV, we use the
3450       # _Create...OnPrimary (which forces the creation), even if we
3451       # are talking about the secondary node
3452       for new_lv in new_lvs:
3453         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3454                                         _GetInstanceInfoText(instance)):
3455           raise errors.OpExecError("Failed to create new LV named '%s' on"
3456                                    " node '%s'" %
3457                                    (new_lv.logical_id[1], tgt_node))
3458
3459     # Step: for each lv, detach+rename*2+attach
3460     self.proc.LogStep(4, steps_total, "change drbd configuration")
3461     for dev, old_lvs, new_lvs in iv_names.itervalues():
3462       info("detaching %s drbd from local storage" % dev.iv_name)
3463       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3464         raise errors.OpExecError("Can't detach drbd from local storage on node"
3465                                  " %s for device %s" % (tgt_node, dev.iv_name))
3466       #dev.children = []
3467       #cfg.Update(instance)
3468
3469       # ok, we created the new LVs, so now we know we have the needed
3470       # storage; as such, we proceed on the target node to rename
3471       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3472       # using the assumption that logical_id == physical_id (which in
3473       # turn is the unique_id on that node)
3474
3475       # FIXME(iustin): use a better name for the replaced LVs
3476       temp_suffix = int(time.time())
3477       ren_fn = lambda d, suff: (d.physical_id[0],
3478                                 d.physical_id[1] + "_replaced-%s" % suff)
3479       # build the rename list based on what LVs exist on the node
3480       rlist = []
3481       for to_ren in old_lvs:
3482         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3483         if find_res is not None: # device exists
3484           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3485
3486       info("renaming the old LVs on the target node")
3487       if not rpc.call_blockdev_rename(tgt_node, rlist):
3488         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3489       # now we rename the new LVs to the old LVs
3490       info("renaming the new LVs on the target node")
3491       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3492       if not rpc.call_blockdev_rename(tgt_node, rlist):
3493         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3494
3495       for old, new in zip(old_lvs, new_lvs):
3496         new.logical_id = old.logical_id
3497         cfg.SetDiskID(new, tgt_node)
3498
3499       for disk in old_lvs:
3500         disk.logical_id = ren_fn(disk, temp_suffix)
3501         cfg.SetDiskID(disk, tgt_node)
3502
3503       # now that the new lvs have the old name, we can add them to the device
3504       info("adding new mirror component on %s" % tgt_node)
3505       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3506         for new_lv in new_lvs:
3507           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3508             warning("Can't rollback device %s", hint="manually cleanup unused"
3509                     " logical volumes")
3510         raise errors.OpExecError("Can't add local storage to drbd")
3511
3512       dev.children = new_lvs
3513       cfg.Update(instance)
3514
3515     # Step: wait for sync
3516
3517     # this can fail as the old devices are degraded and _WaitForSync
3518     # does a combined result over all disks, so we don't check its
3519     # return value
3520     self.proc.LogStep(5, steps_total, "sync devices")
3521     _WaitForSync(cfg, instance, self.proc, unlock=True)
3522
3523     # so check manually all the devices
3524     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3525       cfg.SetDiskID(dev, instance.primary_node)
3526       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3527       if is_degr:
3528         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3529
3530     # Step: remove old storage
3531     self.proc.LogStep(6, steps_total, "removing old storage")
3532     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3533       info("remove logical volumes for %s" % name)
3534       for lv in old_lvs:
3535         cfg.SetDiskID(lv, tgt_node)
3536         if not rpc.call_blockdev_remove(tgt_node, lv):
3537           warning("Can't remove old LV", hint="manually remove unused LVs")
3538           continue
3539
3540   def _ExecD8Secondary(self, feedback_fn):
3541     """Replace the secondary node for drbd8.
3542
3543     The algorithm for replace is quite complicated:
3544       - for all disks of the instance:
3545         - create new LVs on the new node with same names
3546         - shutdown the drbd device on the old secondary
3547         - disconnect the drbd network on the primary
3548         - create the drbd device on the new secondary
3549         - network attach the drbd on the primary, using an artifice:
3550           the drbd code for Attach() will connect to the network if it
3551           finds a device which is connected to the good local disks but
3552           not network enabled
3553       - wait for sync across all devices
3554       - remove all disks from the old secondary
3555
3556     Failures are not very well handled.
3557
3558     """
3559     steps_total = 6
3560     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3561     instance = self.instance
3562     iv_names = {}
3563     vgname = self.cfg.GetVGName()
3564     # start of work
3565     cfg = self.cfg
3566     old_node = self.tgt_node
3567     new_node = self.new_node
3568     pri_node = instance.primary_node
3569
3570     # Step: check device activation
3571     self.proc.LogStep(1, steps_total, "check device existence")
3572     info("checking volume groups")
3573     my_vg = cfg.GetVGName()
3574     results = rpc.call_vg_list([pri_node, new_node])
3575     if not results:
3576       raise errors.OpExecError("Can't list volume groups on the nodes")
3577     for node in pri_node, new_node:
3578       res = results.get(node, False)
3579       if not res or my_vg not in res:
3580         raise errors.OpExecError("Volume group '%s' not found on %s" %
3581                                  (my_vg, node))
3582     for dev in instance.disks:
3583       if not dev.iv_name in self.op.disks:
3584         continue
3585       info("checking %s on %s" % (dev.iv_name, pri_node))
3586       cfg.SetDiskID(dev, pri_node)
3587       if not rpc.call_blockdev_find(pri_node, dev):
3588         raise errors.OpExecError("Can't find device %s on node %s" %
3589                                  (dev.iv_name, pri_node))
3590
3591     # Step: check other node consistency
3592     self.proc.LogStep(2, steps_total, "check peer consistency")
3593     for dev in instance.disks:
3594       if not dev.iv_name in self.op.disks:
3595         continue
3596       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3597       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3598         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3599                                  " unsafe to replace the secondary" %
3600                                  pri_node)
3601
3602     # Step: create new storage
3603     self.proc.LogStep(3, steps_total, "allocate new storage")
3604     for dev in instance.disks:
3605       size = dev.size
3606       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3607       # since we *always* want to create this LV, we use the
3608       # _Create...OnPrimary (which forces the creation), even if we
3609       # are talking about the secondary node
3610       for new_lv in dev.children:
3611         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3612                                         _GetInstanceInfoText(instance)):
3613           raise errors.OpExecError("Failed to create new LV named '%s' on"
3614                                    " node '%s'" %
3615                                    (new_lv.logical_id[1], new_node))
3616
3617       iv_names[dev.iv_name] = (dev, dev.children)
3618
3619     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3620     for dev in instance.disks:
3621       size = dev.size
3622       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3623       # create new devices on new_node
3624       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3625                               logical_id=(pri_node, new_node,
3626                                           dev.logical_id[2]),
3627                               children=dev.children)
3628       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3629                                         new_drbd, False,
3630                                       _GetInstanceInfoText(instance)):
3631         raise errors.OpExecError("Failed to create new DRBD on"
3632                                  " node '%s'" % new_node)
3633
3634     for dev in instance.disks:
3635       # we have new devices, shutdown the drbd on the old secondary
3636       info("shutting down drbd for %s on old node" % dev.iv_name)
3637       cfg.SetDiskID(dev, old_node)
3638       if not rpc.call_blockdev_shutdown(old_node, dev):
3639         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3640                 hint="Please cleanup this device manually as soon as possible")
3641
3642     info("detaching primary drbds from the network (=> standalone)")
3643     done = 0
3644     for dev in instance.disks:
3645       cfg.SetDiskID(dev, pri_node)
3646       # set the physical (unique in bdev terms) id to None, meaning
3647       # detach from network
3648       dev.physical_id = (None,) * len(dev.physical_id)
3649       # and 'find' the device, which will 'fix' it to match the
3650       # standalone state
3651       if rpc.call_blockdev_find(pri_node, dev):
3652         done += 1
3653       else:
3654         warning("Failed to detach drbd %s from network, unusual case" %
3655                 dev.iv_name)
3656
3657     if not done:
3658       # no detaches succeeded (very unlikely)
3659       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3660
3661     # if we managed to detach at least one, we update all the disks of
3662     # the instance to point to the new secondary
3663     info("updating instance configuration")
3664     for dev in instance.disks:
3665       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3666       cfg.SetDiskID(dev, pri_node)
3667     cfg.Update(instance)
3668
3669     # and now perform the drbd attach
3670     info("attaching primary drbds to new secondary (standalone => connected)")
3671     failures = []
3672     for dev in instance.disks:
3673       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3674       # since the attach is smart, it's enough to 'find' the device,
3675       # it will automatically activate the network, if the physical_id
3676       # is correct
3677       cfg.SetDiskID(dev, pri_node)
3678       if not rpc.call_blockdev_find(pri_node, dev):
3679         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3680                 "please do a gnt-instance info to see the status of disks")
3681
3682     # this can fail as the old devices are degraded and _WaitForSync
3683     # does a combined result over all disks, so we don't check its
3684     # return value
3685     self.proc.LogStep(5, steps_total, "sync devices")
3686     _WaitForSync(cfg, instance, self.proc, unlock=True)
3687
3688     # so check manually all the devices
3689     for name, (dev, old_lvs) in iv_names.iteritems():
3690       cfg.SetDiskID(dev, pri_node)
3691       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3692       if is_degr:
3693         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3694
3695     self.proc.LogStep(6, steps_total, "removing old storage")
3696     for name, (dev, old_lvs) in iv_names.iteritems():
3697       info("remove logical volumes for %s" % name)
3698       for lv in old_lvs:
3699         cfg.SetDiskID(lv, old_node)
3700         if not rpc.call_blockdev_remove(old_node, lv):
3701           warning("Can't remove LV on old secondary",
3702                   hint="Cleanup stale volumes by hand")
3703
3704   def Exec(self, feedback_fn):
3705     """Execute disk replacement.
3706
3707     This dispatches the disk replacement to the appropriate handler.
3708
3709     """
3710     instance = self.instance
3711     if instance.disk_template == constants.DT_REMOTE_RAID1:
3712       fn = self._ExecRR1
3713     elif instance.disk_template == constants.DT_DRBD8:
3714       if self.op.remote_node is None:
3715         fn = self._ExecD8DiskOnly
3716       else:
3717         fn = self._ExecD8Secondary
3718     else:
3719       raise errors.ProgrammerError("Unhandled disk replacement case")
3720     return fn(feedback_fn)
3721
3722
3723 class LUQueryInstanceData(NoHooksLU):
3724   """Query runtime instance data.
3725
3726   """
3727   _OP_REQP = ["instances"]
3728
3729   def CheckPrereq(self):
3730     """Check prerequisites.
3731
3732     This only checks the optional instance list against the existing names.
3733
3734     """
3735     if not isinstance(self.op.instances, list):
3736       raise errors.OpPrereqError("Invalid argument type 'instances'")
3737     if self.op.instances:
3738       self.wanted_instances = []
3739       names = self.op.instances
3740       for name in names:
3741         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3742         if instance is None:
3743           raise errors.OpPrereqError("No such instance name '%s'" % name)
3744         self.wanted_instances.append(instance)
3745     else:
3746       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3747                                in self.cfg.GetInstanceList()]
3748     return
3749
3750
3751   def _ComputeDiskStatus(self, instance, snode, dev):
3752     """Compute block device status.
3753
3754     """
3755     self.cfg.SetDiskID(dev, instance.primary_node)
3756     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3757     if dev.dev_type in constants.LDS_DRBD:
3758       # we change the snode then (otherwise we use the one passed in)
3759       if dev.logical_id[0] == instance.primary_node:
3760         snode = dev.logical_id[1]
3761       else:
3762         snode = dev.logical_id[0]
3763
3764     if snode:
3765       self.cfg.SetDiskID(dev, snode)
3766       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3767     else:
3768       dev_sstatus = None
3769
3770     if dev.children:
3771       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3772                       for child in dev.children]
3773     else:
3774       dev_children = []
3775
3776     data = {
3777       "iv_name": dev.iv_name,
3778       "dev_type": dev.dev_type,
3779       "logical_id": dev.logical_id,
3780       "physical_id": dev.physical_id,
3781       "pstatus": dev_pstatus,
3782       "sstatus": dev_sstatus,
3783       "children": dev_children,
3784       }
3785
3786     return data
3787
3788   def Exec(self, feedback_fn):
3789     """Gather and return data"""
3790     result = {}
3791     for instance in self.wanted_instances:
3792       remote_info = rpc.call_instance_info(instance.primary_node,
3793                                                 instance.name)
3794       if remote_info and "state" in remote_info:
3795         remote_state = "up"
3796       else:
3797         remote_state = "down"
3798       if instance.status == "down":
3799         config_state = "down"
3800       else:
3801         config_state = "up"
3802
3803       disks = [self._ComputeDiskStatus(instance, None, device)
3804                for device in instance.disks]
3805
3806       idict = {
3807         "name": instance.name,
3808         "config_state": config_state,
3809         "run_state": remote_state,
3810         "pnode": instance.primary_node,
3811         "snodes": instance.secondary_nodes,
3812         "os": instance.os,
3813         "memory": instance.memory,
3814         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3815         "disks": disks,
3816         "network_port": instance.network_port,
3817         "vcpus": instance.vcpus,
3818         "kernel_path": instance.kernel_path,
3819         "initrd_path": instance.initrd_path,
3820         "hvm_boot_order": instance.hvm_boot_order,
3821         }
3822
3823       result[instance.name] = idict
3824
3825     return result
3826
3827
3828 class LUSetInstanceParms(LogicalUnit):
3829   """Modifies an instances's parameters.
3830
3831   """
3832   HPATH = "instance-modify"
3833   HTYPE = constants.HTYPE_INSTANCE
3834   _OP_REQP = ["instance_name"]
3835
3836   def BuildHooksEnv(self):
3837     """Build hooks env.
3838
3839     This runs on the master, primary and secondaries.
3840
3841     """
3842     args = dict()
3843     if self.mem:
3844       args['memory'] = self.mem
3845     if self.vcpus:
3846       args['vcpus'] = self.vcpus
3847     if self.do_ip or self.do_bridge or self.mac:
3848       if self.do_ip:
3849         ip = self.ip
3850       else:
3851         ip = self.instance.nics[0].ip
3852       if self.bridge:
3853         bridge = self.bridge
3854       else:
3855         bridge = self.instance.nics[0].bridge
3856       if self.mac:
3857         mac = self.mac
3858       else:
3859         mac = self.instance.nics[0].mac
3860       args['nics'] = [(ip, bridge, mac)]
3861     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3862     nl = [self.sstore.GetMasterNode(),
3863           self.instance.primary_node] + list(self.instance.secondary_nodes)
3864     return env, nl, nl
3865
3866   def CheckPrereq(self):
3867     """Check prerequisites.
3868
3869     This only checks the instance list against the existing names.
3870
3871     """
3872     self.mem = getattr(self.op, "mem", None)
3873     self.vcpus = getattr(self.op, "vcpus", None)
3874     self.ip = getattr(self.op, "ip", None)
3875     self.mac = getattr(self.op, "mac", None)
3876     self.bridge = getattr(self.op, "bridge", None)
3877     self.kernel_path = getattr(self.op, "kernel_path", None)
3878     self.initrd_path = getattr(self.op, "initrd_path", None)
3879     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
3880     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
3881                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
3882     if all_parms.count(None) == len(all_parms):
3883       raise errors.OpPrereqError("No changes submitted")
3884     if self.mem is not None:
3885       try:
3886         self.mem = int(self.mem)
3887       except ValueError, err:
3888         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3889     if self.vcpus is not None:
3890       try:
3891         self.vcpus = int(self.vcpus)
3892       except ValueError, err:
3893         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3894     if self.ip is not None:
3895       self.do_ip = True
3896       if self.ip.lower() == "none":
3897         self.ip = None
3898       else:
3899         if not utils.IsValidIP(self.ip):
3900           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3901     else:
3902       self.do_ip = False
3903     self.do_bridge = (self.bridge is not None)
3904     if self.mac is not None:
3905       if self.cfg.IsMacInUse(self.mac):
3906         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
3907                                    self.mac)
3908       if not utils.IsValidMac(self.mac):
3909         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
3910
3911     if self.kernel_path is not None:
3912       self.do_kernel_path = True
3913       if self.kernel_path == constants.VALUE_NONE:
3914         raise errors.OpPrereqError("Can't set instance to no kernel")
3915
3916       if self.kernel_path != constants.VALUE_DEFAULT:
3917         if not os.path.isabs(self.kernel_path):
3918           raise errors.OpPrereqError("The kernel path must be an absolute"
3919                                     " filename")
3920     else:
3921       self.do_kernel_path = False
3922
3923     if self.initrd_path is not None:
3924       self.do_initrd_path = True
3925       if self.initrd_path not in (constants.VALUE_NONE,
3926                                   constants.VALUE_DEFAULT):
3927         if not os.path.isabs(self.initrd_path):
3928           raise errors.OpPrereqError("The initrd path must be an absolute"
3929                                     " filename")
3930     else:
3931       self.do_initrd_path = False
3932
3933     # boot order verification
3934     if self.hvm_boot_order is not None:
3935       if self.hvm_boot_order != constants.VALUE_DEFAULT:
3936         if len(self.hvm_boot_order.strip("acdn")) != 0:
3937           raise errors.OpPrereqError("invalid boot order specified,"
3938                                      " must be one or more of [acdn]"
3939                                      " or 'default'")
3940
3941     instance = self.cfg.GetInstanceInfo(
3942       self.cfg.ExpandInstanceName(self.op.instance_name))
3943     if instance is None:
3944       raise errors.OpPrereqError("No such instance name '%s'" %
3945                                  self.op.instance_name)
3946     self.op.instance_name = instance.name
3947     self.instance = instance
3948     return
3949
3950   def Exec(self, feedback_fn):
3951     """Modifies an instance.
3952
3953     All parameters take effect only at the next restart of the instance.
3954     """
3955     result = []
3956     instance = self.instance
3957     if self.mem:
3958       instance.memory = self.mem
3959       result.append(("mem", self.mem))
3960     if self.vcpus:
3961       instance.vcpus = self.vcpus
3962       result.append(("vcpus",  self.vcpus))
3963     if self.do_ip:
3964       instance.nics[0].ip = self.ip
3965       result.append(("ip", self.ip))
3966     if self.bridge:
3967       instance.nics[0].bridge = self.bridge
3968       result.append(("bridge", self.bridge))
3969     if self.mac:
3970       instance.nics[0].mac = self.mac
3971       result.append(("mac", self.mac))
3972     if self.do_kernel_path:
3973       instance.kernel_path = self.kernel_path
3974       result.append(("kernel_path", self.kernel_path))
3975     if self.do_initrd_path:
3976       instance.initrd_path = self.initrd_path
3977       result.append(("initrd_path", self.initrd_path))
3978     if self.hvm_boot_order:
3979       if self.hvm_boot_order == constants.VALUE_DEFAULT:
3980         instance.hvm_boot_order = None
3981       else:
3982         instance.hvm_boot_order = self.hvm_boot_order
3983       result.append(("hvm_boot_order", self.hvm_boot_order))
3984
3985     self.cfg.AddInstance(instance)
3986
3987     return result
3988
3989
3990 class LUQueryExports(NoHooksLU):
3991   """Query the exports list
3992
3993   """
3994   _OP_REQP = []
3995
3996   def CheckPrereq(self):
3997     """Check that the nodelist contains only existing nodes.
3998
3999     """
4000     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4001
4002   def Exec(self, feedback_fn):
4003     """Compute the list of all the exported system images.
4004
4005     Returns:
4006       a dictionary with the structure node->(export-list)
4007       where export-list is a list of the instances exported on
4008       that node.
4009
4010     """
4011     return rpc.call_export_list(self.nodes)
4012
4013
4014 class LUExportInstance(LogicalUnit):
4015   """Export an instance to an image in the cluster.
4016
4017   """
4018   HPATH = "instance-export"
4019   HTYPE = constants.HTYPE_INSTANCE
4020   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4021
4022   def BuildHooksEnv(self):
4023     """Build hooks env.
4024
4025     This will run on the master, primary node and target node.
4026
4027     """
4028     env = {
4029       "EXPORT_NODE": self.op.target_node,
4030       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4031       }
4032     env.update(_BuildInstanceHookEnvByObject(self.instance))
4033     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4034           self.op.target_node]
4035     return env, nl, nl
4036
4037   def CheckPrereq(self):
4038     """Check prerequisites.
4039
4040     This checks that the instance name is a valid one.
4041
4042     """
4043     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4044     self.instance = self.cfg.GetInstanceInfo(instance_name)
4045     if self.instance is None:
4046       raise errors.OpPrereqError("Instance '%s' not found" %
4047                                  self.op.instance_name)
4048
4049     # node verification
4050     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4051     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4052
4053     if self.dst_node is None:
4054       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4055                                  self.op.target_node)
4056     self.op.target_node = self.dst_node.name
4057
4058   def Exec(self, feedback_fn):
4059     """Export an instance to an image in the cluster.
4060
4061     """
4062     instance = self.instance
4063     dst_node = self.dst_node
4064     src_node = instance.primary_node
4065     # shutdown the instance, unless requested not to do so
4066     if self.op.shutdown:
4067       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4068       self.proc.ChainOpCode(op)
4069
4070     vgname = self.cfg.GetVGName()
4071
4072     snap_disks = []
4073
4074     try:
4075       for disk in instance.disks:
4076         if disk.iv_name == "sda":
4077           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4078           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4079
4080           if not new_dev_name:
4081             logger.Error("could not snapshot block device %s on node %s" %
4082                          (disk.logical_id[1], src_node))
4083           else:
4084             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4085                                       logical_id=(vgname, new_dev_name),
4086                                       physical_id=(vgname, new_dev_name),
4087                                       iv_name=disk.iv_name)
4088             snap_disks.append(new_dev)
4089
4090     finally:
4091       if self.op.shutdown:
4092         op = opcodes.OpStartupInstance(instance_name=instance.name,
4093                                        force=False)
4094         self.proc.ChainOpCode(op)
4095
4096     # TODO: check for size
4097
4098     for dev in snap_disks:
4099       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4100                                            instance):
4101         logger.Error("could not export block device %s from node"
4102                      " %s to node %s" %
4103                      (dev.logical_id[1], src_node, dst_node.name))
4104       if not rpc.call_blockdev_remove(src_node, dev):
4105         logger.Error("could not remove snapshot block device %s from"
4106                      " node %s" % (dev.logical_id[1], src_node))
4107
4108     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4109       logger.Error("could not finalize export for instance %s on node %s" %
4110                    (instance.name, dst_node.name))
4111
4112     nodelist = self.cfg.GetNodeList()
4113     nodelist.remove(dst_node.name)
4114
4115     # on one-node clusters nodelist will be empty after the removal
4116     # if we proceed the backup would be removed because OpQueryExports
4117     # substitutes an empty list with the full cluster node list.
4118     if nodelist:
4119       op = opcodes.OpQueryExports(nodes=nodelist)
4120       exportlist = self.proc.ChainOpCode(op)
4121       for node in exportlist:
4122         if instance.name in exportlist[node]:
4123           if not rpc.call_export_remove(node, instance.name):
4124             logger.Error("could not remove older export for instance %s"
4125                          " on node %s" % (instance.name, node))
4126
4127
4128 class TagsLU(NoHooksLU):
4129   """Generic tags LU.
4130
4131   This is an abstract class which is the parent of all the other tags LUs.
4132
4133   """
4134   def CheckPrereq(self):
4135     """Check prerequisites.
4136
4137     """
4138     if self.op.kind == constants.TAG_CLUSTER:
4139       self.target = self.cfg.GetClusterInfo()
4140     elif self.op.kind == constants.TAG_NODE:
4141       name = self.cfg.ExpandNodeName(self.op.name)
4142       if name is None:
4143         raise errors.OpPrereqError("Invalid node name (%s)" %
4144                                    (self.op.name,))
4145       self.op.name = name
4146       self.target = self.cfg.GetNodeInfo(name)
4147     elif self.op.kind == constants.TAG_INSTANCE:
4148       name = self.cfg.ExpandInstanceName(self.op.name)
4149       if name is None:
4150         raise errors.OpPrereqError("Invalid instance name (%s)" %
4151                                    (self.op.name,))
4152       self.op.name = name
4153       self.target = self.cfg.GetInstanceInfo(name)
4154     else:
4155       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4156                                  str(self.op.kind))
4157
4158
4159 class LUGetTags(TagsLU):
4160   """Returns the tags of a given object.
4161
4162   """
4163   _OP_REQP = ["kind", "name"]
4164
4165   def Exec(self, feedback_fn):
4166     """Returns the tag list.
4167
4168     """
4169     return self.target.GetTags()
4170
4171
4172 class LUSearchTags(NoHooksLU):
4173   """Searches the tags for a given pattern.
4174
4175   """
4176   _OP_REQP = ["pattern"]
4177
4178   def CheckPrereq(self):
4179     """Check prerequisites.
4180
4181     This checks the pattern passed for validity by compiling it.
4182
4183     """
4184     try:
4185       self.re = re.compile(self.op.pattern)
4186     except re.error, err:
4187       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4188                                  (self.op.pattern, err))
4189
4190   def Exec(self, feedback_fn):
4191     """Returns the tag list.
4192
4193     """
4194     cfg = self.cfg
4195     tgts = [("/cluster", cfg.GetClusterInfo())]
4196     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4197     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4198     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4199     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4200     results = []
4201     for path, target in tgts:
4202       for tag in target.GetTags():
4203         if self.re.search(tag):
4204           results.append((path, tag))
4205     return results
4206
4207
4208 class LUAddTags(TagsLU):
4209   """Sets a tag on a given object.
4210
4211   """
4212   _OP_REQP = ["kind", "name", "tags"]
4213
4214   def CheckPrereq(self):
4215     """Check prerequisites.
4216
4217     This checks the type and length of the tag name and value.
4218
4219     """
4220     TagsLU.CheckPrereq(self)
4221     for tag in self.op.tags:
4222       objects.TaggableObject.ValidateTag(tag)
4223
4224   def Exec(self, feedback_fn):
4225     """Sets the tag.
4226
4227     """
4228     try:
4229       for tag in self.op.tags:
4230         self.target.AddTag(tag)
4231     except errors.TagError, err:
4232       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4233     try:
4234       self.cfg.Update(self.target)
4235     except errors.ConfigurationError:
4236       raise errors.OpRetryError("There has been a modification to the"
4237                                 " config file and the operation has been"
4238                                 " aborted. Please retry.")
4239
4240
4241 class LUDelTags(TagsLU):
4242   """Delete a list of tags from a given object.
4243
4244   """
4245   _OP_REQP = ["kind", "name", "tags"]
4246
4247   def CheckPrereq(self):
4248     """Check prerequisites.
4249
4250     This checks that we have the given tag.
4251
4252     """
4253     TagsLU.CheckPrereq(self)
4254     for tag in self.op.tags:
4255       objects.TaggableObject.ValidateTag(tag)
4256     del_tags = frozenset(self.op.tags)
4257     cur_tags = self.target.GetTags()
4258     if not del_tags <= cur_tags:
4259       diff_tags = del_tags - cur_tags
4260       diff_names = ["'%s'" % tag for tag in diff_tags]
4261       diff_names.sort()
4262       raise errors.OpPrereqError("Tag(s) %s not found" %
4263                                  (",".join(diff_names)))
4264
4265   def Exec(self, feedback_fn):
4266     """Remove the tag from the object.
4267
4268     """
4269     for tag in self.op.tags:
4270       self.target.RemoveTag(tag)
4271     try:
4272       self.cfg.Update(self.target)
4273     except errors.ConfigurationError:
4274       raise errors.OpRetryError("There has been a modification to the"
4275                                 " config file and the operation has been"
4276                                 " aborted. Please retry.")
4277
4278 class LUTestDelay(NoHooksLU):
4279   """Sleep for a specified amount of time.
4280
4281   This LU sleeps on the master and/or nodes for a specified amoutn of
4282   time.
4283
4284   """
4285   _OP_REQP = ["duration", "on_master", "on_nodes"]
4286
4287   def CheckPrereq(self):
4288     """Check prerequisites.
4289
4290     This checks that we have a good list of nodes and/or the duration
4291     is valid.
4292
4293     """
4294
4295     if self.op.on_nodes:
4296       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4297
4298   def Exec(self, feedback_fn):
4299     """Do the actual sleep.
4300
4301     """
4302     if self.op.on_master:
4303       if not utils.TestDelay(self.op.duration):
4304         raise errors.OpExecError("Error during master delay test")
4305     if self.op.on_nodes:
4306       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4307       if not result:
4308         raise errors.OpExecError("Complete failure from rpc call")
4309       for node, node_result in result.items():
4310         if not node_result:
4311           raise errors.OpExecError("Failure during rpc call to node %s,"
4312                                    " result: %s" % (node, node_result))