Merge branch 'next' into branch-2.1
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "log" not in kwargs or kwargs["log"]: # if we should log this error
72     if "exc" in kwargs and kwargs["exc"]:
73       logging.exception(msg)
74     else:
75       logging.error(msg)
76   raise RPCFail(msg)
77
78
79 def _GetConfig():
80   """Simple wrapper to return a SimpleStore.
81
82   @rtype: L{ssconf.SimpleStore}
83   @return: a SimpleStore instance
84
85   """
86   return ssconf.SimpleStore()
87
88
89 def _GetSshRunner(cluster_name):
90   """Simple wrapper to return an SshRunner.
91
92   @type cluster_name: str
93   @param cluster_name: the cluster name, which is needed
94       by the SshRunner constructor
95   @rtype: L{ssh.SshRunner}
96   @return: an SshRunner instance
97
98   """
99   return ssh.SshRunner(cluster_name)
100
101
102 def _Decompress(data):
103   """Unpacks data compressed by the RPC client.
104
105   @type data: list or tuple
106   @param data: Data sent by RPC client
107   @rtype: str
108   @return: Decompressed data
109
110   """
111   assert isinstance(data, (list, tuple))
112   assert len(data) == 2
113   (encoding, content) = data
114   if encoding == constants.RPC_ENCODING_NONE:
115     return content
116   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
117     return zlib.decompress(base64.b64decode(content))
118   else:
119     raise AssertionError("Unknown data encoding")
120
121
122 def _CleanDirectory(path, exclude=None):
123   """Removes all regular files in a directory.
124
125   @type path: str
126   @param path: the directory to clean
127   @type exclude: list
128   @param exclude: list of files to be excluded, defaults
129       to the empty list
130
131   """
132   if not os.path.isdir(path):
133     return
134   if exclude is None:
135     exclude = []
136   else:
137     # Normalize excluded paths
138     exclude = [os.path.normpath(i) for i in exclude]
139
140   for rel_name in utils.ListVisibleFiles(path):
141     full_name = os.path.normpath(os.path.join(path, rel_name))
142     if full_name in exclude:
143       continue
144     if os.path.isfile(full_name) and not os.path.islink(full_name):
145       utils.RemoveFile(full_name)
146
147
148 def JobQueuePurge():
149   """Removes job queue files and archived jobs.
150
151   @rtype: tuple
152   @return: True, None
153
154   """
155   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
156   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
157
158
159 def GetMasterInfo():
160   """Returns master information.
161
162   This is an utility function to compute master information, either
163   for consumption here or from the node daemon.
164
165   @rtype: tuple
166   @return: master_netdev, master_ip, master_name
167   @raise RPCFail: in case of errors
168
169   """
170   try:
171     cfg = _GetConfig()
172     master_netdev = cfg.GetMasterNetdev()
173     master_ip = cfg.GetMasterIP()
174     master_node = cfg.GetMasterNode()
175   except errors.ConfigurationError, err:
176     _Fail("Cluster configuration incomplete: %s", err, exc=True)
177   return (master_netdev, master_ip, master_node)
178
179
180 def StartMaster(start_daemons):
181   """Activate local node as master node.
182
183   The function will always try activate the IP address of the master
184   (unless someone else has it). It will also start the master daemons,
185   based on the start_daemons parameter.
186
187   @type start_daemons: boolean
188   @param start_daemons: whether to also start the master
189       daemons (ganeti-masterd and ganeti-rapi)
190   @rtype: None
191
192   """
193   # GetMasterInfo will raise an exception if not able to return data
194   master_netdev, master_ip, _ = GetMasterInfo()
195
196   err_msgs = []
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Master IP already configured, doing nothing")
201     else:
202       msg = "Someone else has the master ip, not activating"
203       logging.error(msg)
204       err_msgs.append(msg)
205   else:
206     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
207                            "dev", master_netdev, "label",
208                            "%s:0" % master_netdev])
209     if result.failed:
210       msg = "Can't activate master IP: %s" % result.output
211       logging.error(msg)
212       err_msgs.append(msg)
213
214     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
215                            "-s", master_ip, master_ip])
216     # we'll ignore the exit code of arping
217
218   # and now start the master and rapi daemons
219   if start_daemons:
220     for daemon in 'ganeti-masterd', 'ganeti-rapi':
221       result = utils.RunCmd([daemon])
222       if result.failed:
223         msg = "Can't start daemon %s: %s" % (daemon, result.output)
224         logging.error(msg)
225         err_msgs.append(msg)
226
227   if err_msgs:
228     _Fail("; ".join(err_msgs))
229
230
231 def StopMaster(stop_daemons):
232   """Deactivate this node as master.
233
234   The function will always try to deactivate the IP address of the
235   master. It will also stop the master daemons depending on the
236   stop_daemons parameter.
237
238   @type stop_daemons: boolean
239   @param stop_daemons: whether to also stop the master daemons
240       (ganeti-masterd and ganeti-rapi)
241   @rtype: None
242
243   """
244   # TODO: log and report back to the caller the error failures; we
245   # need to decide in which case we fail the RPC for this
246
247   # GetMasterInfo will raise an exception if not able to return data
248   master_netdev, master_ip, _ = GetMasterInfo()
249
250   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
251                          "dev", master_netdev])
252   if result.failed:
253     logging.error("Can't remove the master IP, error: %s", result.output)
254     # but otherwise ignore the failure
255
256   if stop_daemons:
257     # stop/kill the rapi and the master daemon
258     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
259       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
260
261
262 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
263   """Joins this node to the cluster.
264
265   This does the following:
266       - updates the hostkeys of the machine (rsa and dsa)
267       - adds the ssh private key to the user
268       - adds the ssh public key to the users' authorized_keys file
269
270   @type dsa: str
271   @param dsa: the DSA private key to write
272   @type dsapub: str
273   @param dsapub: the DSA public key to write
274   @type rsa: str
275   @param rsa: the RSA private key to write
276   @type rsapub: str
277   @param rsapub: the RSA public key to write
278   @type sshkey: str
279   @param sshkey: the SSH private key to write
280   @type sshpub: str
281   @param sshpub: the SSH public key to write
282   @rtype: boolean
283   @return: the success of the operation
284
285   """
286   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
287                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
288                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
289                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
290   for name, content, mode in sshd_keys:
291     utils.WriteFile(name, data=content, mode=mode)
292
293   try:
294     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
295                                                     mkdir=True)
296   except errors.OpExecError, err:
297     _Fail("Error while processing user ssh files: %s", err, exc=True)
298
299   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
300     utils.WriteFile(name, data=content, mode=0600)
301
302   utils.AddAuthorizedKey(auth_keys, sshpub)
303
304   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
305
306
307 def LeaveCluster():
308   """Cleans up and remove the current node.
309
310   This function cleans up and prepares the current node to be removed
311   from the cluster.
312
313   If processing is successful, then it raises an
314   L{errors.QuitGanetiException} which is used as a special case to
315   shutdown the node daemon.
316
317   """
318   _CleanDirectory(constants.DATA_DIR)
319   JobQueuePurge()
320
321   try:
322     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
323
324     f = open(pub_key, 'r')
325     try:
326       utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327     finally:
328       f.close()
329
330     utils.RemoveFile(priv_key)
331     utils.RemoveFile(pub_key)
332   except errors.OpExecError:
333     logging.exception("Error while processing ssh files")
334
335   # Raise a custom exception (handled in ganeti-noded)
336   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
337
338
339 def GetNodeInfo(vgname, hypervisor_type):
340   """Gives back a hash with different information about the node.
341
342   @type vgname: C{string}
343   @param vgname: the name of the volume group to ask for disk space information
344   @type hypervisor_type: C{str}
345   @param hypervisor_type: the name of the hypervisor to ask for
346       memory information
347   @rtype: C{dict}
348   @return: dictionary with the following keys:
349       - vg_size is the size of the configured volume group in MiB
350       - vg_free is the free size of the volume group in MiB
351       - memory_dom0 is the memory allocated for domain0 in MiB
352       - memory_free is the currently available (free) ram in MiB
353       - memory_total is the total number of ram in MiB
354
355   """
356   outputarray = {}
357   vginfo = _GetVGInfo(vgname)
358   outputarray['vg_size'] = vginfo['vg_size']
359   outputarray['vg_free'] = vginfo['vg_free']
360
361   hyper = hypervisor.GetHypervisor(hypervisor_type)
362   hyp_info = hyper.GetNodeInfo()
363   if hyp_info is not None:
364     outputarray.update(hyp_info)
365
366   f = open("/proc/sys/kernel/random/boot_id", 'r')
367   try:
368     outputarray["bootid"] = f.read(128).rstrip("\n")
369   finally:
370     f.close()
371
372   return outputarray
373
374
375 def VerifyNode(what, cluster_name):
376   """Verify the status of the local node.
377
378   Based on the input L{what} parameter, various checks are done on the
379   local node.
380
381   If the I{filelist} key is present, this list of
382   files is checksummed and the file/checksum pairs are returned.
383
384   If the I{nodelist} key is present, we check that we have
385   connectivity via ssh with the target nodes (and check the hostname
386   report).
387
388   If the I{node-net-test} key is present, we check that we have
389   connectivity to the given nodes via both primary IP and, if
390   applicable, secondary IPs.
391
392   @type what: C{dict}
393   @param what: a dictionary of things to check:
394       - filelist: list of files for which to compute checksums
395       - nodelist: list of nodes we should check ssh communication with
396       - node-net-test: list of nodes we should check node daemon port
397         connectivity with
398       - hypervisor: list with hypervisors to run the verify for
399   @rtype: dict
400   @return: a dictionary with the same keys as the input dict, and
401       values representing the result of the checks
402
403   """
404   result = {}
405
406   if constants.NV_HYPERVISOR in what:
407     result[constants.NV_HYPERVISOR] = tmp = {}
408     for hv_name in what[constants.NV_HYPERVISOR]:
409       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
410
411   if constants.NV_FILELIST in what:
412     result[constants.NV_FILELIST] = utils.FingerprintFiles(
413       what[constants.NV_FILELIST])
414
415   if constants.NV_NODELIST in what:
416     result[constants.NV_NODELIST] = tmp = {}
417     random.shuffle(what[constants.NV_NODELIST])
418     for node in what[constants.NV_NODELIST]:
419       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
420       if not success:
421         tmp[node] = message
422
423   if constants.NV_NODENETTEST in what:
424     result[constants.NV_NODENETTEST] = tmp = {}
425     my_name = utils.HostInfo().name
426     my_pip = my_sip = None
427     for name, pip, sip in what[constants.NV_NODENETTEST]:
428       if name == my_name:
429         my_pip = pip
430         my_sip = sip
431         break
432     if not my_pip:
433       tmp[my_name] = ("Can't find my own primary/secondary IP"
434                       " in the node list")
435     else:
436       port = utils.GetNodeDaemonPort()
437       for name, pip, sip in what[constants.NV_NODENETTEST]:
438         fail = []
439         if not utils.TcpPing(pip, port, source=my_pip):
440           fail.append("primary")
441         if sip != pip:
442           if not utils.TcpPing(sip, port, source=my_sip):
443             fail.append("secondary")
444         if fail:
445           tmp[name] = ("failure using the %s interface(s)" %
446                        " and ".join(fail))
447
448   if constants.NV_LVLIST in what:
449     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
450
451   if constants.NV_INSTANCELIST in what:
452     result[constants.NV_INSTANCELIST] = GetInstanceList(
453       what[constants.NV_INSTANCELIST])
454
455   if constants.NV_VGLIST in what:
456     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
457
458   if constants.NV_VERSION in what:
459     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
460                                     constants.RELEASE_VERSION)
461
462   if constants.NV_HVINFO in what:
463     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
464     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
465
466   if constants.NV_DRBDLIST in what:
467     try:
468       used_minors = bdev.DRBD8.GetUsedDevs().keys()
469     except errors.BlockDeviceError, err:
470       logging.warning("Can't get used minors list", exc_info=True)
471       used_minors = str(err)
472     result[constants.NV_DRBDLIST] = used_minors
473
474   return result
475
476
477 def GetVolumeList(vg_name):
478   """Compute list of logical volumes and their size.
479
480   @type vg_name: str
481   @param vg_name: the volume group whose LVs we should list
482   @rtype: dict
483   @return:
484       dictionary of all partions (key) with value being a tuple of
485       their size (in MiB), inactive and online status::
486
487         {'test1': ('20.06', True, True)}
488
489       in case of errors, a string is returned with the error
490       details.
491
492   """
493   lvs = {}
494   sep = '|'
495   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
496                          "--separator=%s" % sep,
497                          "-olv_name,lv_size,lv_attr", vg_name])
498   if result.failed:
499     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
500
501   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
502   for line in result.stdout.splitlines():
503     line = line.strip()
504     match = valid_line_re.match(line)
505     if not match:
506       logging.error("Invalid line returned from lvs output: '%s'", line)
507       continue
508     name, size, attr = match.groups()
509     inactive = attr[4] == '-'
510     online = attr[5] == 'o'
511     lvs[name] = (size, inactive, online)
512
513   return lvs
514
515
516 def ListVolumeGroups():
517   """List the volume groups and their size.
518
519   @rtype: dict
520   @return: dictionary with keys volume name and values the
521       size of the volume
522
523   """
524   return utils.ListVolumeGroups()
525
526
527 def NodeVolumes():
528   """List all volumes on this node.
529
530   @rtype: list
531   @return:
532     A list of dictionaries, each having four keys:
533       - name: the logical volume name,
534       - size: the size of the logical volume
535       - dev: the physical device on which the LV lives
536       - vg: the volume group to which it belongs
537
538     In case of errors, we return an empty list and log the
539     error.
540
541     Note that since a logical volume can live on multiple physical
542     volumes, the resulting list might include a logical volume
543     multiple times.
544
545   """
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=|",
548                          "--options=lv_name,lv_size,devices,vg_name"])
549   if result.failed:
550     _Fail("Failed to list logical volumes, lvs output: %s",
551           result.output)
552
553   def parse_dev(dev):
554     if '(' in dev:
555       return dev.split('(')[0]
556     else:
557       return dev
558
559   def map_line(line):
560     return {
561       'name': line[0].strip(),
562       'size': line[1].strip(),
563       'dev': parse_dev(line[2].strip()),
564       'vg': line[3].strip(),
565     }
566
567   return [map_line(line.split('|')) for line in result.stdout.splitlines()
568           if line.count('|') >= 3]
569
570
571 def BridgesExist(bridges_list):
572   """Check if a list of bridges exist on the current node.
573
574   @rtype: boolean
575   @return: C{True} if all of them exist, C{False} otherwise
576
577   """
578   missing = []
579   for bridge in bridges_list:
580     if not utils.BridgeExists(bridge):
581       missing.append(bridge)
582
583   if missing:
584     _Fail("Missing bridges %s", ", ".join(missing))
585
586
587 def GetInstanceList(hypervisor_list):
588   """Provides a list of instances.
589
590   @type hypervisor_list: list
591   @param hypervisor_list: the list of hypervisors to query information
592
593   @rtype: list
594   @return: a list of all running instances on the current node
595     - instance1.example.com
596     - instance2.example.com
597
598   """
599   results = []
600   for hname in hypervisor_list:
601     try:
602       names = hypervisor.GetHypervisor(hname).ListInstances()
603       results.extend(names)
604     except errors.HypervisorError, err:
605       _Fail("Error enumerating instances (hypervisor %s): %s",
606             hname, err, exc=True)
607
608   return results
609
610
611 def GetInstanceInfo(instance, hname):
612   """Gives back the information about an instance as a dictionary.
613
614   @type instance: string
615   @param instance: the instance name
616   @type hname: string
617   @param hname: the hypervisor type of the instance
618
619   @rtype: dict
620   @return: dictionary with the following keys:
621       - memory: memory size of instance (int)
622       - state: xen state of instance (string)
623       - time: cpu time of instance (float)
624
625   """
626   output = {}
627
628   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
629   if iinfo is not None:
630     output['memory'] = iinfo[2]
631     output['state'] = iinfo[4]
632     output['time'] = iinfo[5]
633
634   return output
635
636
637 def GetInstanceMigratable(instance):
638   """Gives whether an instance can be migrated.
639
640   @type instance: L{objects.Instance}
641   @param instance: object representing the instance to be checked.
642
643   @rtype: tuple
644   @return: tuple of (result, description) where:
645       - result: whether the instance can be migrated or not
646       - description: a description of the issue, if relevant
647
648   """
649   hyper = hypervisor.GetHypervisor(instance.hypervisor)
650   iname = instance.name
651   if iname not in hyper.ListInstances():
652     _Fail("Instance %s is not running", iname)
653
654   for idx in range(len(instance.disks)):
655     link_name = _GetBlockDevSymlinkPath(iname, idx)
656     if not os.path.islink(link_name):
657       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
658
659
660 def GetAllInstancesInfo(hypervisor_list):
661   """Gather data about all instances.
662
663   This is the equivalent of L{GetInstanceInfo}, except that it
664   computes data for all instances at once, thus being faster if one
665   needs data about more than one instance.
666
667   @type hypervisor_list: list
668   @param hypervisor_list: list of hypervisors to query for instance data
669
670   @rtype: dict
671   @return: dictionary of instance: data, with data having the following keys:
672       - memory: memory size of instance (int)
673       - state: xen state of instance (string)
674       - time: cpu time of instance (float)
675       - vcpus: the number of vcpus
676
677   """
678   output = {}
679
680   for hname in hypervisor_list:
681     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
682     if iinfo:
683       for name, _, memory, vcpus, state, times in iinfo:
684         value = {
685           'memory': memory,
686           'vcpus': vcpus,
687           'state': state,
688           'time': times,
689           }
690         if name in output:
691           # we only check static parameters, like memory and vcpus,
692           # and not state and time which can change between the
693           # invocations of the different hypervisors
694           for key in 'memory', 'vcpus':
695             if value[key] != output[name][key]:
696               _Fail("Instance %s is running twice"
697                     " with different parameters", name)
698         output[name] = value
699
700   return output
701
702
703 def InstanceOsAdd(instance, reinstall):
704   """Add an OS to an instance.
705
706   @type instance: L{objects.Instance}
707   @param instance: Instance whose OS is to be installed
708   @type reinstall: boolean
709   @param reinstall: whether this is an instance reinstall
710   @rtype: None
711
712   """
713   inst_os = OSFromDisk(instance.os)
714
715   create_env = OSEnvironment(instance, inst_os)
716   if reinstall:
717     create_env['INSTANCE_REINSTALL'] = "1"
718
719   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
720                                      instance.name, int(time.time()))
721
722   result = utils.RunCmd([inst_os.create_script], env=create_env,
723                         cwd=inst_os.path, output=logfile,)
724   if result.failed:
725     logging.error("os create command '%s' returned error: %s, logfile: %s,"
726                   " output: %s", result.cmd, result.fail_reason, logfile,
727                   result.output)
728     lines = [utils.SafeEncode(val)
729              for val in utils.TailFile(logfile, lines=20)]
730     _Fail("OS create script failed (%s), last lines in the"
731           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
732
733
734 def RunRenameInstance(instance, old_name):
735   """Run the OS rename script for an instance.
736
737   @type instance: L{objects.Instance}
738   @param instance: Instance whose OS is to be installed
739   @type old_name: string
740   @param old_name: previous instance name
741   @rtype: boolean
742   @return: the success of the operation
743
744   """
745   inst_os = OSFromDisk(instance.os)
746
747   rename_env = OSEnvironment(instance, inst_os)
748   rename_env['OLD_INSTANCE_NAME'] = old_name
749
750   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
751                                            old_name,
752                                            instance.name, int(time.time()))
753
754   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
755                         cwd=inst_os.path, output=logfile)
756
757   if result.failed:
758     logging.error("os create command '%s' returned error: %s output: %s",
759                   result.cmd, result.fail_reason, result.output)
760     lines = [utils.SafeEncode(val)
761              for val in utils.TailFile(logfile, lines=20)]
762     _Fail("OS rename script failed (%s), last lines in the"
763           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
764
765
766 def _GetVGInfo(vg_name):
767   """Get information about the volume group.
768
769   @type vg_name: str
770   @param vg_name: the volume group which we query
771   @rtype: dict
772   @return:
773     A dictionary with the following keys:
774       - C{vg_size} is the total size of the volume group in MiB
775       - C{vg_free} is the free size of the volume group in MiB
776       - C{pv_count} are the number of physical disks in that VG
777
778     If an error occurs during gathering of data, we return the same dict
779     with keys all set to None.
780
781   """
782   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
783
784   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
785                          "--nosuffix", "--units=m", "--separator=:", vg_name])
786
787   if retval.failed:
788     logging.error("volume group %s not present", vg_name)
789     return retdic
790   valarr = retval.stdout.strip().rstrip(':').split(':')
791   if len(valarr) == 3:
792     try:
793       retdic = {
794         "vg_size": int(round(float(valarr[0]), 0)),
795         "vg_free": int(round(float(valarr[1]), 0)),
796         "pv_count": int(valarr[2]),
797         }
798     except ValueError, err:
799       logging.exception("Fail to parse vgs output: %s", err)
800   else:
801     logging.error("vgs output has the wrong number of fields (expected"
802                   " three): %s", str(valarr))
803   return retdic
804
805
806 def _GetBlockDevSymlinkPath(instance_name, idx):
807   return os.path.join(constants.DISK_LINKS_DIR,
808                       "%s:%d" % (instance_name, idx))
809
810
811 def _SymlinkBlockDev(instance_name, device_path, idx):
812   """Set up symlinks to a instance's block device.
813
814   This is an auxiliary function run when an instance is start (on the primary
815   node) or when an instance is migrated (on the target node).
816
817
818   @param instance_name: the name of the target instance
819   @param device_path: path of the physical block device, on the node
820   @param idx: the disk index
821   @return: absolute path to the disk's symlink
822
823   """
824   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
825   try:
826     os.symlink(device_path, link_name)
827   except OSError, err:
828     if err.errno == errno.EEXIST:
829       if (not os.path.islink(link_name) or
830           os.readlink(link_name) != device_path):
831         os.remove(link_name)
832         os.symlink(device_path, link_name)
833     else:
834       raise
835
836   return link_name
837
838
839 def _RemoveBlockDevLinks(instance_name, disks):
840   """Remove the block device symlinks belonging to the given instance.
841
842   """
843   for idx, _ in enumerate(disks):
844     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
845     if os.path.islink(link_name):
846       try:
847         os.remove(link_name)
848       except OSError:
849         logging.exception("Can't remove symlink '%s'", link_name)
850
851
852 def _GatherAndLinkBlockDevs(instance):
853   """Set up an instance's block device(s).
854
855   This is run on the primary node at instance startup. The block
856   devices must be already assembled.
857
858   @type instance: L{objects.Instance}
859   @param instance: the instance whose disks we shoul assemble
860   @rtype: list
861   @return: list of (disk_object, device_path)
862
863   """
864   block_devices = []
865   for idx, disk in enumerate(instance.disks):
866     device = _RecursiveFindBD(disk)
867     if device is None:
868       raise errors.BlockDeviceError("Block device '%s' is not set up." %
869                                     str(disk))
870     device.Open()
871     try:
872       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
873     except OSError, e:
874       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
875                                     e.strerror)
876
877     block_devices.append((disk, link_name))
878
879   return block_devices
880
881
882 def StartInstance(instance):
883   """Start an instance.
884
885   @type instance: L{objects.Instance}
886   @param instance: the instance object
887   @rtype: None
888
889   """
890   running_instances = GetInstanceList([instance.hypervisor])
891
892   if instance.name in running_instances:
893     logging.info("Instance %s already running, not starting", instance.name)
894     return
895
896   try:
897     block_devices = _GatherAndLinkBlockDevs(instance)
898     hyper = hypervisor.GetHypervisor(instance.hypervisor)
899     hyper.StartInstance(instance, block_devices)
900   except errors.BlockDeviceError, err:
901     _Fail("Block device error: %s", err, exc=True)
902   except errors.HypervisorError, err:
903     _RemoveBlockDevLinks(instance.name, instance.disks)
904     _Fail("Hypervisor error: %s", err, exc=True)
905
906
907 def InstanceShutdown(instance):
908   """Shut an instance down.
909
910   @note: this functions uses polling with a hardcoded timeout.
911
912   @type instance: L{objects.Instance}
913   @param instance: the instance object
914   @rtype: None
915
916   """
917   hv_name = instance.hypervisor
918   running_instances = GetInstanceList([hv_name])
919   iname = instance.name
920
921   if iname not in running_instances:
922     logging.info("Instance %s not running, doing nothing", iname)
923     return
924
925   hyper = hypervisor.GetHypervisor(hv_name)
926   try:
927     hyper.StopInstance(instance)
928   except errors.HypervisorError, err:
929     _Fail("Failed to stop instance %s: %s", iname, err)
930
931   # test every 10secs for 2min
932
933   time.sleep(1)
934   for _ in range(11):
935     if instance.name not in GetInstanceList([hv_name]):
936       break
937     time.sleep(10)
938   else:
939     # the shutdown did not succeed
940     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
941
942     try:
943       hyper.StopInstance(instance, force=True)
944     except errors.HypervisorError, err:
945       _Fail("Failed to force stop instance %s: %s", iname, err)
946
947     time.sleep(1)
948     if instance.name in GetInstanceList([hv_name]):
949       _Fail("Could not shutdown instance %s even by destroy", iname)
950
951   _RemoveBlockDevLinks(iname, instance.disks)
952
953
954 def InstanceReboot(instance, reboot_type):
955   """Reboot an instance.
956
957   @type instance: L{objects.Instance}
958   @param instance: the instance object to reboot
959   @type reboot_type: str
960   @param reboot_type: the type of reboot, one the following
961     constants:
962       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
963         instance OS, do not recreate the VM
964       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
965         restart the VM (at the hypervisor level)
966       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
967         not accepted here, since that mode is handled differently, in
968         cmdlib, and translates into full stop and start of the
969         instance (instead of a call_instance_reboot RPC)
970   @rtype: None
971
972   """
973   running_instances = GetInstanceList([instance.hypervisor])
974
975   if instance.name not in running_instances:
976     _Fail("Cannot reboot instance %s that is not running", instance.name)
977
978   hyper = hypervisor.GetHypervisor(instance.hypervisor)
979   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
980     try:
981       hyper.RebootInstance(instance)
982     except errors.HypervisorError, err:
983       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
984   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
985     try:
986       InstanceShutdown(instance)
987       return StartInstance(instance)
988     except errors.HypervisorError, err:
989       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
990   else:
991     _Fail("Invalid reboot_type received: %s", reboot_type)
992
993
994 def MigrationInfo(instance):
995   """Gather information about an instance to be migrated.
996
997   @type instance: L{objects.Instance}
998   @param instance: the instance definition
999
1000   """
1001   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1002   try:
1003     info = hyper.MigrationInfo(instance)
1004   except errors.HypervisorError, err:
1005     _Fail("Failed to fetch migration information: %s", err, exc=True)
1006   return info
1007
1008
1009 def AcceptInstance(instance, info, target):
1010   """Prepare the node to accept an instance.
1011
1012   @type instance: L{objects.Instance}
1013   @param instance: the instance definition
1014   @type info: string/data (opaque)
1015   @param info: migration information, from the source node
1016   @type target: string
1017   @param target: target host (usually ip), on this node
1018
1019   """
1020   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1021   try:
1022     hyper.AcceptInstance(instance, info, target)
1023   except errors.HypervisorError, err:
1024     _Fail("Failed to accept instance: %s", err, exc=True)
1025
1026
1027 def FinalizeMigration(instance, info, success):
1028   """Finalize any preparation to accept an instance.
1029
1030   @type instance: L{objects.Instance}
1031   @param instance: the instance definition
1032   @type info: string/data (opaque)
1033   @param info: migration information, from the source node
1034   @type success: boolean
1035   @param success: whether the migration was a success or a failure
1036
1037   """
1038   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1039   try:
1040     hyper.FinalizeMigration(instance, info, success)
1041   except errors.HypervisorError, err:
1042     _Fail("Failed to finalize migration: %s", err, exc=True)
1043
1044
1045 def MigrateInstance(instance, target, live):
1046   """Migrates an instance to another node.
1047
1048   @type instance: L{objects.Instance}
1049   @param instance: the instance definition
1050   @type target: string
1051   @param target: the target node name
1052   @type live: boolean
1053   @param live: whether the migration should be done live or not (the
1054       interpretation of this parameter is left to the hypervisor)
1055   @rtype: tuple
1056   @return: a tuple of (success, msg) where:
1057       - succes is a boolean denoting the success/failure of the operation
1058       - msg is a string with details in case of failure
1059
1060   """
1061   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1062
1063   try:
1064     hyper.MigrateInstance(instance.name, target, live)
1065   except errors.HypervisorError, err:
1066     _Fail("Failed to migrate instance: %s", err, exc=True)
1067
1068
1069 def BlockdevCreate(disk, size, owner, on_primary, info):
1070   """Creates a block device for an instance.
1071
1072   @type disk: L{objects.Disk}
1073   @param disk: the object describing the disk we should create
1074   @type size: int
1075   @param size: the size of the physical underlying device, in MiB
1076   @type owner: str
1077   @param owner: the name of the instance for which disk is created,
1078       used for device cache data
1079   @type on_primary: boolean
1080   @param on_primary:  indicates if it is the primary node or not
1081   @type info: string
1082   @param info: string that will be sent to the physical device
1083       creation, used for example to set (LVM) tags on LVs
1084
1085   @return: the new unique_id of the device (this can sometime be
1086       computed only after creation), or None. On secondary nodes,
1087       it's not required to return anything.
1088
1089   """
1090   clist = []
1091   if disk.children:
1092     for child in disk.children:
1093       try:
1094         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1095       except errors.BlockDeviceError, err:
1096         _Fail("Can't assemble device %s: %s", child, err)
1097       if on_primary or disk.AssembleOnSecondary():
1098         # we need the children open in case the device itself has to
1099         # be assembled
1100         try:
1101           crdev.Open()
1102         except errors.BlockDeviceError, err:
1103           _Fail("Can't make child '%s' read-write: %s", child, err)
1104       clist.append(crdev)
1105
1106   try:
1107     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1108   except errors.BlockDeviceError, err:
1109     _Fail("Can't create block device: %s", err)
1110
1111   if on_primary or disk.AssembleOnSecondary():
1112     try:
1113       device.Assemble()
1114     except errors.BlockDeviceError, err:
1115       _Fail("Can't assemble device after creation, unusual event: %s", err)
1116     device.SetSyncSpeed(constants.SYNC_SPEED)
1117     if on_primary or disk.OpenOnSecondary():
1118       try:
1119         device.Open(force=True)
1120       except errors.BlockDeviceError, err:
1121         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1122     DevCacheManager.UpdateCache(device.dev_path, owner,
1123                                 on_primary, disk.iv_name)
1124
1125   device.SetInfo(info)
1126
1127   return device.unique_id
1128
1129
1130 def BlockdevRemove(disk):
1131   """Remove a block device.
1132
1133   @note: This is intended to be called recursively.
1134
1135   @type disk: L{objects.Disk}
1136   @param disk: the disk object we should remove
1137   @rtype: boolean
1138   @return: the success of the operation
1139
1140   """
1141   msgs = []
1142   try:
1143     rdev = _RecursiveFindBD(disk)
1144   except errors.BlockDeviceError, err:
1145     # probably can't attach
1146     logging.info("Can't attach to device %s in remove", disk)
1147     rdev = None
1148   if rdev is not None:
1149     r_path = rdev.dev_path
1150     try:
1151       rdev.Remove()
1152     except errors.BlockDeviceError, err:
1153       msgs.append(str(err))
1154     if not msgs:
1155       DevCacheManager.RemoveCache(r_path)
1156
1157   if disk.children:
1158     for child in disk.children:
1159       try:
1160         BlockdevRemove(child)
1161       except RPCFail, err:
1162         msgs.append(str(err))
1163
1164   if msgs:
1165     _Fail("; ".join(msgs))
1166
1167
1168 def _RecursiveAssembleBD(disk, owner, as_primary):
1169   """Activate a block device for an instance.
1170
1171   This is run on the primary and secondary nodes for an instance.
1172
1173   @note: this function is called recursively.
1174
1175   @type disk: L{objects.Disk}
1176   @param disk: the disk we try to assemble
1177   @type owner: str
1178   @param owner: the name of the instance which owns the disk
1179   @type as_primary: boolean
1180   @param as_primary: if we should make the block device
1181       read/write
1182
1183   @return: the assembled device or None (in case no device
1184       was assembled)
1185   @raise errors.BlockDeviceError: in case there is an error
1186       during the activation of the children or the device
1187       itself
1188
1189   """
1190   children = []
1191   if disk.children:
1192     mcn = disk.ChildrenNeeded()
1193     if mcn == -1:
1194       mcn = 0 # max number of Nones allowed
1195     else:
1196       mcn = len(disk.children) - mcn # max number of Nones
1197     for chld_disk in disk.children:
1198       try:
1199         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1200       except errors.BlockDeviceError, err:
1201         if children.count(None) >= mcn:
1202           raise
1203         cdev = None
1204         logging.error("Error in child activation (but continuing): %s",
1205                       str(err))
1206       children.append(cdev)
1207
1208   if as_primary or disk.AssembleOnSecondary():
1209     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1210     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1211     result = r_dev
1212     if as_primary or disk.OpenOnSecondary():
1213       r_dev.Open()
1214     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1215                                 as_primary, disk.iv_name)
1216
1217   else:
1218     result = True
1219   return result
1220
1221
1222 def BlockdevAssemble(disk, owner, as_primary):
1223   """Activate a block device for an instance.
1224
1225   This is a wrapper over _RecursiveAssembleBD.
1226
1227   @rtype: str or boolean
1228   @return: a C{/dev/...} path for primary nodes, and
1229       C{True} for secondary nodes
1230
1231   """
1232   try:
1233     result = _RecursiveAssembleBD(disk, owner, as_primary)
1234     if isinstance(result, bdev.BlockDev):
1235       result = result.dev_path
1236   except errors.BlockDeviceError, err:
1237     _Fail("Error while assembling disk: %s", err, exc=True)
1238
1239   return result
1240
1241
1242 def BlockdevShutdown(disk):
1243   """Shut down a block device.
1244
1245   First, if the device is assembled (Attach() is successful), then
1246   the device is shutdown. Then the children of the device are
1247   shutdown.
1248
1249   This function is called recursively. Note that we don't cache the
1250   children or such, as oppossed to assemble, shutdown of different
1251   devices doesn't require that the upper device was active.
1252
1253   @type disk: L{objects.Disk}
1254   @param disk: the description of the disk we should
1255       shutdown
1256   @rtype: None
1257
1258   """
1259   msgs = []
1260   r_dev = _RecursiveFindBD(disk)
1261   if r_dev is not None:
1262     r_path = r_dev.dev_path
1263     try:
1264       r_dev.Shutdown()
1265       DevCacheManager.RemoveCache(r_path)
1266     except errors.BlockDeviceError, err:
1267       msgs.append(str(err))
1268
1269   if disk.children:
1270     for child in disk.children:
1271       try:
1272         BlockdevShutdown(child)
1273       except RPCFail, err:
1274         msgs.append(str(err))
1275
1276   if msgs:
1277     _Fail("; ".join(msgs))
1278
1279
1280 def BlockdevAddchildren(parent_cdev, new_cdevs):
1281   """Extend a mirrored block device.
1282
1283   @type parent_cdev: L{objects.Disk}
1284   @param parent_cdev: the disk to which we should add children
1285   @type new_cdevs: list of L{objects.Disk}
1286   @param new_cdevs: the list of children which we should add
1287   @rtype: None
1288
1289   """
1290   parent_bdev = _RecursiveFindBD(parent_cdev)
1291   if parent_bdev is None:
1292     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1293   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1294   if new_bdevs.count(None) > 0:
1295     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1296   parent_bdev.AddChildren(new_bdevs)
1297
1298
1299 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1300   """Shrink a mirrored block device.
1301
1302   @type parent_cdev: L{objects.Disk}
1303   @param parent_cdev: the disk from which we should remove children
1304   @type new_cdevs: list of L{objects.Disk}
1305   @param new_cdevs: the list of children which we should remove
1306   @rtype: None
1307
1308   """
1309   parent_bdev = _RecursiveFindBD(parent_cdev)
1310   if parent_bdev is None:
1311     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1312   devs = []
1313   for disk in new_cdevs:
1314     rpath = disk.StaticDevPath()
1315     if rpath is None:
1316       bd = _RecursiveFindBD(disk)
1317       if bd is None:
1318         _Fail("Can't find device %s while removing children", disk)
1319       else:
1320         devs.append(bd.dev_path)
1321     else:
1322       devs.append(rpath)
1323   parent_bdev.RemoveChildren(devs)
1324
1325
1326 def BlockdevGetmirrorstatus(disks):
1327   """Get the mirroring status of a list of devices.
1328
1329   @type disks: list of L{objects.Disk}
1330   @param disks: the list of disks which we should query
1331   @rtype: disk
1332   @return:
1333       a list of (mirror_done, estimated_time) tuples, which
1334       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1335   @raise errors.BlockDeviceError: if any of the disks cannot be
1336       found
1337
1338   """
1339   stats = []
1340   for dsk in disks:
1341     rbd = _RecursiveFindBD(dsk)
1342     if rbd is None:
1343       _Fail("Can't find device %s", dsk)
1344     stats.append(rbd.CombinedSyncStatus())
1345   return stats
1346
1347
1348 def _RecursiveFindBD(disk):
1349   """Check if a device is activated.
1350
1351   If so, return information about the real device.
1352
1353   @type disk: L{objects.Disk}
1354   @param disk: the disk object we need to find
1355
1356   @return: None if the device can't be found,
1357       otherwise the device instance
1358
1359   """
1360   children = []
1361   if disk.children:
1362     for chdisk in disk.children:
1363       children.append(_RecursiveFindBD(chdisk))
1364
1365   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1366
1367
1368 def BlockdevFind(disk):
1369   """Check if a device is activated.
1370
1371   If it is, return information about the real device.
1372
1373   @type disk: L{objects.Disk}
1374   @param disk: the disk to find
1375   @rtype: None or tuple
1376   @return: None if the disk cannot be found, otherwise a
1377       tuple (device_path, major, minor, sync_percent,
1378       estimated_time, is_degraded)
1379
1380   """
1381   try:
1382     rbd = _RecursiveFindBD(disk)
1383   except errors.BlockDeviceError, err:
1384     _Fail("Failed to find device: %s", err, exc=True)
1385   if rbd is None:
1386     return None
1387   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1388
1389
1390 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1391   """Write a file to the filesystem.
1392
1393   This allows the master to overwrite(!) a file. It will only perform
1394   the operation if the file belongs to a list of configuration files.
1395
1396   @type file_name: str
1397   @param file_name: the target file name
1398   @type data: str
1399   @param data: the new contents of the file
1400   @type mode: int
1401   @param mode: the mode to give the file (can be None)
1402   @type uid: int
1403   @param uid: the owner of the file (can be -1 for default)
1404   @type gid: int
1405   @param gid: the group of the file (can be -1 for default)
1406   @type atime: float
1407   @param atime: the atime to set on the file (can be None)
1408   @type mtime: float
1409   @param mtime: the mtime to set on the file (can be None)
1410   @rtype: None
1411
1412   """
1413   if not os.path.isabs(file_name):
1414     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1415
1416   allowed_files = set([
1417     constants.CLUSTER_CONF_FILE,
1418     constants.ETC_HOSTS,
1419     constants.SSH_KNOWN_HOSTS_FILE,
1420     constants.VNC_PASSWORD_FILE,
1421     constants.RAPI_CERT_FILE,
1422     constants.RAPI_USERS_FILE,
1423     ])
1424
1425   for hv_name in constants.HYPER_TYPES:
1426     hv_class = hypervisor.GetHypervisor(hv_name)
1427     allowed_files.update(hv_class.GetAncillaryFiles())
1428
1429   if file_name not in allowed_files:
1430     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1431           file_name)
1432
1433   raw_data = _Decompress(data)
1434
1435   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1436                   atime=atime, mtime=mtime)
1437
1438
1439 def WriteSsconfFiles(values):
1440   """Update all ssconf files.
1441
1442   Wrapper around the SimpleStore.WriteFiles.
1443
1444   """
1445   ssconf.SimpleStore().WriteFiles(values)
1446
1447
1448 def _ErrnoOrStr(err):
1449   """Format an EnvironmentError exception.
1450
1451   If the L{err} argument has an errno attribute, it will be looked up
1452   and converted into a textual C{E...} description. Otherwise the
1453   string representation of the error will be returned.
1454
1455   @type err: L{EnvironmentError}
1456   @param err: the exception to format
1457
1458   """
1459   if hasattr(err, 'errno'):
1460     detail = errno.errorcode[err.errno]
1461   else:
1462     detail = str(err)
1463   return detail
1464
1465
1466 def _OSOndiskAPIVersion(name, os_dir):
1467   """Compute and return the API version of a given OS.
1468
1469   This function will try to read the API version of the OS given by
1470   the 'name' parameter and residing in the 'os_dir' directory.
1471
1472   @type name: str
1473   @param name: the OS name we should look for
1474   @type os_dir: str
1475   @param os_dir: the directory inwhich we should look for the OS
1476   @rtype: tuple
1477   @return: tuple (status, data) with status denoting the validity and
1478       data holding either the vaid versions or an error message
1479
1480   """
1481   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1482
1483   try:
1484     st = os.stat(api_file)
1485   except EnvironmentError, err:
1486     return False, ("Required file 'ganeti_api_version' file not"
1487                    " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1488
1489   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1490     return False, ("File 'ganeti_api_version' file at %s is not"
1491                    " a regular file" % os_dir)
1492
1493   try:
1494     api_versions = utils.ReadFile(api_file).splitlines()
1495   except EnvironmentError, err:
1496     return False, ("Error while reading the API version file at %s: %s" %
1497                    (api_file, _ErrnoOrStr(err)))
1498
1499   try:
1500     api_versions = [int(version.strip()) for version in api_versions]
1501   except (TypeError, ValueError), err:
1502     return False, ("API version(s) can't be converted to integer: %s" %
1503                    str(err))
1504
1505   return True, api_versions
1506
1507
1508 def DiagnoseOS(top_dirs=None):
1509   """Compute the validity for all OSes.
1510
1511   @type top_dirs: list
1512   @param top_dirs: the list of directories in which to
1513       search (if not given defaults to
1514       L{constants.OS_SEARCH_PATH})
1515   @rtype: list of L{objects.OS}
1516   @return: a list of tuples (name, path, status, diagnose)
1517       for all (potential) OSes under all search paths, where:
1518           - name is the (potential) OS name
1519           - path is the full path to the OS
1520           - status True/False is the validity of the OS
1521           - diagnose is the error message for an invalid OS, otherwise empty
1522
1523   """
1524   if top_dirs is None:
1525     top_dirs = constants.OS_SEARCH_PATH
1526
1527   result = []
1528   for dir_name in top_dirs:
1529     if os.path.isdir(dir_name):
1530       try:
1531         f_names = utils.ListVisibleFiles(dir_name)
1532       except EnvironmentError, err:
1533         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1534         break
1535       for name in f_names:
1536         os_path = os.path.sep.join([dir_name, name])
1537         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1538         if status:
1539           diagnose = ""
1540         else:
1541           diagnose = os_inst
1542         result.append((name, os_path, status, diagnose))
1543
1544   return result
1545
1546
1547 def _TryOSFromDisk(name, base_dir=None):
1548   """Create an OS instance from disk.
1549
1550   This function will return an OS instance if the given name is a
1551   valid OS name.
1552
1553   @type base_dir: string
1554   @keyword base_dir: Base directory containing OS installations.
1555                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1556   @rtype: tuple
1557   @return: success and either the OS instance if we find a valid one,
1558       or error message
1559
1560   """
1561   if base_dir is None:
1562     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1563     if os_dir is None:
1564       return False, "Directory for OS %s not found in search path" % name
1565   else:
1566     os_dir = os.path.sep.join([base_dir, name])
1567
1568   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1569   if not status:
1570     # push the error up
1571     return status, api_versions
1572
1573   if not constants.OS_API_VERSIONS.intersection(api_versions):
1574     return False, ("API version mismatch for path '%s': found %s, want %s." %
1575                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1576
1577   # OS Scripts dictionary, we will populate it with the actual script names
1578   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1579
1580   for script in os_scripts:
1581     os_scripts[script] = os.path.sep.join([os_dir, script])
1582
1583     try:
1584       st = os.stat(os_scripts[script])
1585     except EnvironmentError, err:
1586       return False, ("Script '%s' under path '%s' is missing (%s)" %
1587                      (script, os_dir, _ErrnoOrStr(err)))
1588
1589     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1590       return False, ("Script '%s' under path '%s' is not executable" %
1591                      (script, os_dir))
1592
1593     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1594       return False, ("Script '%s' under path '%s' is not a regular file" %
1595                      (script, os_dir))
1596
1597   os_obj = objects.OS(name=name, path=os_dir,
1598                       create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1599                       export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1600                       import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1601                       rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1602                       api_versions=api_versions)
1603   return True, os_obj
1604
1605
1606 def OSFromDisk(name, base_dir=None):
1607   """Create an OS instance from disk.
1608
1609   This function will return an OS instance if the given name is a
1610   valid OS name. Otherwise, it will raise an appropriate
1611   L{RPCFail} exception, detailing why this is not a valid OS.
1612
1613   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1614   an exception but returns true/false status data.
1615
1616   @type base_dir: string
1617   @keyword base_dir: Base directory containing OS installations.
1618                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1619   @rtype: L{objects.OS}
1620   @return: the OS instance if we find a valid one
1621   @raise RPCFail: if we don't find a valid OS
1622
1623   """
1624   status, payload = _TryOSFromDisk(name, base_dir)
1625
1626   if not status:
1627     _Fail(payload)
1628
1629   return payload
1630
1631
1632 def OSEnvironment(instance, os, debug=0):
1633   """Calculate the environment for an os script.
1634
1635   @type instance: L{objects.Instance}
1636   @param instance: target instance for the os script run
1637   @type os: L{objects.OS}
1638   @param os: operating system for which the environment is being built
1639   @type debug: integer
1640   @param debug: debug level (0 or 1, for OS Api 10)
1641   @rtype: dict
1642   @return: dict of environment variables
1643   @raise errors.BlockDeviceError: if the block device
1644       cannot be found
1645
1646   """
1647   result = {}
1648   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1649   result['OS_API_VERSION'] = '%d' % api_version
1650   result['INSTANCE_NAME'] = instance.name
1651   result['INSTANCE_OS'] = instance.os
1652   result['HYPERVISOR'] = instance.hypervisor
1653   result['DISK_COUNT'] = '%d' % len(instance.disks)
1654   result['NIC_COUNT'] = '%d' % len(instance.nics)
1655   result['DEBUG_LEVEL'] = '%d' % debug
1656   for idx, disk in enumerate(instance.disks):
1657     real_disk = _RecursiveFindBD(disk)
1658     if real_disk is None:
1659       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1660                                     str(disk))
1661     real_disk.Open()
1662     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1663     result['DISK_%d_ACCESS' % idx] = disk.mode
1664     if constants.HV_DISK_TYPE in instance.hvparams:
1665       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1666         instance.hvparams[constants.HV_DISK_TYPE]
1667     if disk.dev_type in constants.LDS_BLOCK:
1668       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1669     elif disk.dev_type == constants.LD_FILE:
1670       result['DISK_%d_BACKEND_TYPE' % idx] = \
1671         'file:%s' % disk.physical_id[0]
1672   for idx, nic in enumerate(instance.nics):
1673     result['NIC_%d_MAC' % idx] = nic.mac
1674     if nic.ip:
1675       result['NIC_%d_IP' % idx] = nic.ip
1676     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1677     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1678       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1679     if nic.nicparams[constants.NIC_LINK]:
1680       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1681     if constants.HV_NIC_TYPE in instance.hvparams:
1682       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1683         instance.hvparams[constants.HV_NIC_TYPE]
1684
1685   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1686     for key, value in source.items():
1687       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1688
1689   return result
1690
1691 def BlockdevGrow(disk, amount):
1692   """Grow a stack of block devices.
1693
1694   This function is called recursively, with the childrens being the
1695   first ones to resize.
1696
1697   @type disk: L{objects.Disk}
1698   @param disk: the disk to be grown
1699   @rtype: (status, result)
1700   @return: a tuple with the status of the operation
1701       (True/False), and the errors message if status
1702       is False
1703
1704   """
1705   r_dev = _RecursiveFindBD(disk)
1706   if r_dev is None:
1707     _Fail("Cannot find block device %s", disk)
1708
1709   try:
1710     r_dev.Grow(amount)
1711   except errors.BlockDeviceError, err:
1712     _Fail("Failed to grow block device: %s", err, exc=True)
1713
1714
1715 def BlockdevSnapshot(disk):
1716   """Create a snapshot copy of a block device.
1717
1718   This function is called recursively, and the snapshot is actually created
1719   just for the leaf lvm backend device.
1720
1721   @type disk: L{objects.Disk}
1722   @param disk: the disk to be snapshotted
1723   @rtype: string
1724   @return: snapshot disk path
1725
1726   """
1727   if disk.children:
1728     if len(disk.children) == 1:
1729       # only one child, let's recurse on it
1730       return BlockdevSnapshot(disk.children[0])
1731     else:
1732       # more than one child, choose one that matches
1733       for child in disk.children:
1734         if child.size == disk.size:
1735           # return implies breaking the loop
1736           return BlockdevSnapshot(child)
1737   elif disk.dev_type == constants.LD_LV:
1738     r_dev = _RecursiveFindBD(disk)
1739     if r_dev is not None:
1740       # let's stay on the safe side and ask for the full size, for now
1741       return r_dev.Snapshot(disk.size)
1742     else:
1743       _Fail("Cannot find block device %s", disk)
1744   else:
1745     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1746           disk.unique_id, disk.dev_type)
1747
1748
1749 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1750   """Export a block device snapshot to a remote node.
1751
1752   @type disk: L{objects.Disk}
1753   @param disk: the description of the disk to export
1754   @type dest_node: str
1755   @param dest_node: the destination node to export to
1756   @type instance: L{objects.Instance}
1757   @param instance: the instance object to whom the disk belongs
1758   @type cluster_name: str
1759   @param cluster_name: the cluster name, needed for SSH hostalias
1760   @type idx: int
1761   @param idx: the index of the disk in the instance's disk list,
1762       used to export to the OS scripts environment
1763   @rtype: None
1764
1765   """
1766   inst_os = OSFromDisk(instance.os)
1767   export_env = OSEnvironment(instance, inst_os)
1768
1769   export_script = inst_os.export_script
1770
1771   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1772                                      instance.name, int(time.time()))
1773   if not os.path.exists(constants.LOG_OS_DIR):
1774     os.mkdir(constants.LOG_OS_DIR, 0750)
1775   real_disk = _RecursiveFindBD(disk)
1776   if real_disk is None:
1777     _Fail("Block device '%s' is not set up", disk)
1778
1779   real_disk.Open()
1780
1781   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1782   export_env['EXPORT_INDEX'] = str(idx)
1783
1784   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1785   destfile = disk.physical_id[1]
1786
1787   # the target command is built out of three individual commands,
1788   # which are joined by pipes; we check each individual command for
1789   # valid parameters
1790   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1791                                export_script, logfile)
1792
1793   comprcmd = "gzip"
1794
1795   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1796                                 destdir, destdir, destfile)
1797   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1798                                                    constants.GANETI_RUNAS,
1799                                                    destcmd)
1800
1801   # all commands have been checked, so we're safe to combine them
1802   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1803
1804   result = utils.RunCmd(command, env=export_env)
1805
1806   if result.failed:
1807     _Fail("OS snapshot export command '%s' returned error: %s"
1808           " output: %s", command, result.fail_reason, result.output)
1809
1810
1811 def FinalizeExport(instance, snap_disks):
1812   """Write out the export configuration information.
1813
1814   @type instance: L{objects.Instance}
1815   @param instance: the instance which we export, used for
1816       saving configuration
1817   @type snap_disks: list of L{objects.Disk}
1818   @param snap_disks: list of snapshot block devices, which
1819       will be used to get the actual name of the dump file
1820
1821   @rtype: None
1822
1823   """
1824   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1825   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1826
1827   config = objects.SerializableConfigParser()
1828
1829   config.add_section(constants.INISECT_EXP)
1830   config.set(constants.INISECT_EXP, 'version', '0')
1831   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1832   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1833   config.set(constants.INISECT_EXP, 'os', instance.os)
1834   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1835
1836   config.add_section(constants.INISECT_INS)
1837   config.set(constants.INISECT_INS, 'name', instance.name)
1838   config.set(constants.INISECT_INS, 'memory', '%d' %
1839              instance.beparams[constants.BE_MEMORY])
1840   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1841              instance.beparams[constants.BE_VCPUS])
1842   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1843
1844   nic_total = 0
1845   for nic_count, nic in enumerate(instance.nics):
1846     nic_total += 1
1847     config.set(constants.INISECT_INS, 'nic%d_mac' %
1848                nic_count, '%s' % nic.mac)
1849     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1850     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1851                '%s' % nic.bridge)
1852   # TODO: redundant: on load can read nics until it doesn't exist
1853   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1854
1855   disk_total = 0
1856   for disk_count, disk in enumerate(snap_disks):
1857     if disk:
1858       disk_total += 1
1859       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1860                  ('%s' % disk.iv_name))
1861       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1862                  ('%s' % disk.physical_id[1]))
1863       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1864                  ('%d' % disk.size))
1865
1866   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1867
1868   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1869                   data=config.Dumps())
1870   shutil.rmtree(finaldestdir, True)
1871   shutil.move(destdir, finaldestdir)
1872
1873
1874 def ExportInfo(dest):
1875   """Get export configuration information.
1876
1877   @type dest: str
1878   @param dest: directory containing the export
1879
1880   @rtype: L{objects.SerializableConfigParser}
1881   @return: a serializable config file containing the
1882       export info
1883
1884   """
1885   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1886
1887   config = objects.SerializableConfigParser()
1888   config.read(cff)
1889
1890   if (not config.has_section(constants.INISECT_EXP) or
1891       not config.has_section(constants.INISECT_INS)):
1892     _Fail("Export info file doesn't have the required fields")
1893
1894   return config.Dumps()
1895
1896
1897 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1898   """Import an os image into an instance.
1899
1900   @type instance: L{objects.Instance}
1901   @param instance: instance to import the disks into
1902   @type src_node: string
1903   @param src_node: source node for the disk images
1904   @type src_images: list of string
1905   @param src_images: absolute paths of the disk images
1906   @rtype: list of boolean
1907   @return: each boolean represent the success of importing the n-th disk
1908
1909   """
1910   inst_os = OSFromDisk(instance.os)
1911   import_env = OSEnvironment(instance, inst_os)
1912   import_script = inst_os.import_script
1913
1914   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1915                                         instance.name, int(time.time()))
1916   if not os.path.exists(constants.LOG_OS_DIR):
1917     os.mkdir(constants.LOG_OS_DIR, 0750)
1918
1919   comprcmd = "gunzip"
1920   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1921                                import_script, logfile)
1922
1923   final_result = []
1924   for idx, image in enumerate(src_images):
1925     if image:
1926       destcmd = utils.BuildShellCmd('cat %s', image)
1927       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1928                                                        constants.GANETI_RUNAS,
1929                                                        destcmd)
1930       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1931       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1932       import_env['IMPORT_INDEX'] = str(idx)
1933       result = utils.RunCmd(command, env=import_env)
1934       if result.failed:
1935         logging.error("Disk import command '%s' returned error: %s"
1936                       " output: %s", command, result.fail_reason,
1937                       result.output)
1938         final_result.append("error importing disk %d: %s, %s" %
1939                             (idx, result.fail_reason, result.output[-100]))
1940
1941   if final_result:
1942     _Fail("; ".join(final_result), log=False)
1943
1944
1945 def ListExports():
1946   """Return a list of exports currently available on this machine.
1947
1948   @rtype: list
1949   @return: list of the exports
1950
1951   """
1952   if os.path.isdir(constants.EXPORT_DIR):
1953     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1954   else:
1955     _Fail("No exports directory")
1956
1957
1958 def RemoveExport(export):
1959   """Remove an existing export from the node.
1960
1961   @type export: str
1962   @param export: the name of the export to remove
1963   @rtype: None
1964
1965   """
1966   target = os.path.join(constants.EXPORT_DIR, export)
1967
1968   try:
1969     shutil.rmtree(target)
1970   except EnvironmentError, err:
1971     _Fail("Error while removing the export: %s", err, exc=True)
1972
1973
1974 def BlockdevRename(devlist):
1975   """Rename a list of block devices.
1976
1977   @type devlist: list of tuples
1978   @param devlist: list of tuples of the form  (disk,
1979       new_logical_id, new_physical_id); disk is an
1980       L{objects.Disk} object describing the current disk,
1981       and new logical_id/physical_id is the name we
1982       rename it to
1983   @rtype: boolean
1984   @return: True if all renames succeeded, False otherwise
1985
1986   """
1987   msgs = []
1988   result = True
1989   for disk, unique_id in devlist:
1990     dev = _RecursiveFindBD(disk)
1991     if dev is None:
1992       msgs.append("Can't find device %s in rename" % str(disk))
1993       result = False
1994       continue
1995     try:
1996       old_rpath = dev.dev_path
1997       dev.Rename(unique_id)
1998       new_rpath = dev.dev_path
1999       if old_rpath != new_rpath:
2000         DevCacheManager.RemoveCache(old_rpath)
2001         # FIXME: we should add the new cache information here, like:
2002         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2003         # but we don't have the owner here - maybe parse from existing
2004         # cache? for now, we only lose lvm data when we rename, which
2005         # is less critical than DRBD or MD
2006     except errors.BlockDeviceError, err:
2007       msgs.append("Can't rename device '%s' to '%s': %s" %
2008                   (dev, unique_id, err))
2009       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2010       result = False
2011   if not result:
2012     _Fail("; ".join(msgs))
2013
2014
2015 def _TransformFileStorageDir(file_storage_dir):
2016   """Checks whether given file_storage_dir is valid.
2017
2018   Checks wheter the given file_storage_dir is within the cluster-wide
2019   default file_storage_dir stored in SimpleStore. Only paths under that
2020   directory are allowed.
2021
2022   @type file_storage_dir: str
2023   @param file_storage_dir: the path to check
2024
2025   @return: the normalized path if valid, None otherwise
2026
2027   """
2028   cfg = _GetConfig()
2029   file_storage_dir = os.path.normpath(file_storage_dir)
2030   base_file_storage_dir = cfg.GetFileStorageDir()
2031   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2032       base_file_storage_dir):
2033     _Fail("File storage directory '%s' is not under base file"
2034           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2035   return file_storage_dir
2036
2037
2038 def CreateFileStorageDir(file_storage_dir):
2039   """Create file storage directory.
2040
2041   @type file_storage_dir: str
2042   @param file_storage_dir: directory to create
2043
2044   @rtype: tuple
2045   @return: tuple with first element a boolean indicating wheter dir
2046       creation was successful or not
2047
2048   """
2049   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2050   if os.path.exists(file_storage_dir):
2051     if not os.path.isdir(file_storage_dir):
2052       _Fail("Specified storage dir '%s' is not a directory",
2053             file_storage_dir)
2054   else:
2055     try:
2056       os.makedirs(file_storage_dir, 0750)
2057     except OSError, err:
2058       _Fail("Cannot create file storage directory '%s': %s",
2059             file_storage_dir, err, exc=True)
2060
2061
2062 def RemoveFileStorageDir(file_storage_dir):
2063   """Remove file storage directory.
2064
2065   Remove it only if it's empty. If not log an error and return.
2066
2067   @type file_storage_dir: str
2068   @param file_storage_dir: the directory we should cleanup
2069   @rtype: tuple (success,)
2070   @return: tuple of one element, C{success}, denoting
2071       whether the operation was successful
2072
2073   """
2074   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2075   if os.path.exists(file_storage_dir):
2076     if not os.path.isdir(file_storage_dir):
2077       _Fail("Specified Storage directory '%s' is not a directory",
2078             file_storage_dir)
2079     # deletes dir only if empty, otherwise we want to fail the rpc call
2080     try:
2081       os.rmdir(file_storage_dir)
2082     except OSError, err:
2083       _Fail("Cannot remove file storage directory '%s': %s",
2084             file_storage_dir, err)
2085
2086
2087 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2088   """Rename the file storage directory.
2089
2090   @type old_file_storage_dir: str
2091   @param old_file_storage_dir: the current path
2092   @type new_file_storage_dir: str
2093   @param new_file_storage_dir: the name we should rename to
2094   @rtype: tuple (success,)
2095   @return: tuple of one element, C{success}, denoting
2096       whether the operation was successful
2097
2098   """
2099   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2100   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2101   if not os.path.exists(new_file_storage_dir):
2102     if os.path.isdir(old_file_storage_dir):
2103       try:
2104         os.rename(old_file_storage_dir, new_file_storage_dir)
2105       except OSError, err:
2106         _Fail("Cannot rename '%s' to '%s': %s",
2107               old_file_storage_dir, new_file_storage_dir, err)
2108     else:
2109       _Fail("Specified storage dir '%s' is not a directory",
2110             old_file_storage_dir)
2111   else:
2112     if os.path.exists(old_file_storage_dir):
2113       _Fail("Cannot rename '%s' to '%s': both locations exist",
2114             old_file_storage_dir, new_file_storage_dir)
2115
2116
2117 def _EnsureJobQueueFile(file_name):
2118   """Checks whether the given filename is in the queue directory.
2119
2120   @type file_name: str
2121   @param file_name: the file name we should check
2122   @rtype: None
2123   @raises RPCFail: if the file is not valid
2124
2125   """
2126   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2127   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2128
2129   if not result:
2130     _Fail("Passed job queue file '%s' does not belong to"
2131           " the queue directory '%s'", file_name, queue_dir)
2132
2133
2134 def JobQueueUpdate(file_name, content):
2135   """Updates a file in the queue directory.
2136
2137   This is just a wrapper over L{utils.WriteFile}, with proper
2138   checking.
2139
2140   @type file_name: str
2141   @param file_name: the job file name
2142   @type content: str
2143   @param content: the new job contents
2144   @rtype: boolean
2145   @return: the success of the operation
2146
2147   """
2148   _EnsureJobQueueFile(file_name)
2149
2150   # Write and replace the file atomically
2151   utils.WriteFile(file_name, data=_Decompress(content))
2152
2153
2154 def JobQueueRename(old, new):
2155   """Renames a job queue file.
2156
2157   This is just a wrapper over os.rename with proper checking.
2158
2159   @type old: str
2160   @param old: the old (actual) file name
2161   @type new: str
2162   @param new: the desired file name
2163   @rtype: tuple
2164   @return: the success of the operation and payload
2165
2166   """
2167   _EnsureJobQueueFile(old)
2168   _EnsureJobQueueFile(new)
2169
2170   utils.RenameFile(old, new, mkdir=True)
2171
2172
2173 def JobQueueSetDrainFlag(drain_flag):
2174   """Set the drain flag for the queue.
2175
2176   This will set or unset the queue drain flag.
2177
2178   @type drain_flag: boolean
2179   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2180   @rtype: truple
2181   @return: always True, None
2182   @warning: the function always returns True
2183
2184   """
2185   if drain_flag:
2186     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2187   else:
2188     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2189
2190
2191 def BlockdevClose(instance_name, disks):
2192   """Closes the given block devices.
2193
2194   This means they will be switched to secondary mode (in case of
2195   DRBD).
2196
2197   @param instance_name: if the argument is not empty, the symlinks
2198       of this instance will be removed
2199   @type disks: list of L{objects.Disk}
2200   @param disks: the list of disks to be closed
2201   @rtype: tuple (success, message)
2202   @return: a tuple of success and message, where success
2203       indicates the succes of the operation, and message
2204       which will contain the error details in case we
2205       failed
2206
2207   """
2208   bdevs = []
2209   for cf in disks:
2210     rd = _RecursiveFindBD(cf)
2211     if rd is None:
2212       _Fail("Can't find device %s", cf)
2213     bdevs.append(rd)
2214
2215   msg = []
2216   for rd in bdevs:
2217     try:
2218       rd.Close()
2219     except errors.BlockDeviceError, err:
2220       msg.append(str(err))
2221   if msg:
2222     _Fail("Can't make devices secondary: %s", ",".join(msg))
2223   else:
2224     if instance_name:
2225       _RemoveBlockDevLinks(instance_name, disks)
2226
2227
2228 def ValidateHVParams(hvname, hvparams):
2229   """Validates the given hypervisor parameters.
2230
2231   @type hvname: string
2232   @param hvname: the hypervisor name
2233   @type hvparams: dict
2234   @param hvparams: the hypervisor parameters to be validated
2235   @rtype: None
2236
2237   """
2238   try:
2239     hv_type = hypervisor.GetHypervisor(hvname)
2240     hv_type.ValidateParameters(hvparams)
2241   except errors.HypervisorError, err:
2242     _Fail(str(err), log=False)
2243
2244
2245 def DemoteFromMC():
2246   """Demotes the current node from master candidate role.
2247
2248   """
2249   # try to ensure we're not the master by mistake
2250   master, myself = ssconf.GetMasterAndMyself()
2251   if master == myself:
2252     _Fail("ssconf status shows I'm the master node, will not demote")
2253   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2254   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2255     _Fail("The master daemon is running, will not demote")
2256   try:
2257     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2258       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2259   except EnvironmentError, err:
2260     if err.errno != errno.ENOENT:
2261       _Fail("Error while backing up cluster file: %s", err, exc=True)
2262   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2263
2264
2265 def _FindDisks(nodes_ip, disks):
2266   """Sets the physical ID on disks and returns the block devices.
2267
2268   """
2269   # set the correct physical ID
2270   my_name = utils.HostInfo().name
2271   for cf in disks:
2272     cf.SetPhysicalID(my_name, nodes_ip)
2273
2274   bdevs = []
2275
2276   for cf in disks:
2277     rd = _RecursiveFindBD(cf)
2278     if rd is None:
2279       _Fail("Can't find device %s", cf)
2280     bdevs.append(rd)
2281   return bdevs
2282
2283
2284 def DrbdDisconnectNet(nodes_ip, disks):
2285   """Disconnects the network on a list of drbd devices.
2286
2287   """
2288   bdevs = _FindDisks(nodes_ip, disks)
2289
2290   # disconnect disks
2291   for rd in bdevs:
2292     try:
2293       rd.DisconnectNet()
2294     except errors.BlockDeviceError, err:
2295       _Fail("Can't change network configuration to standalone mode: %s",
2296             err, exc=True)
2297
2298
2299 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2300   """Attaches the network on a list of drbd devices.
2301
2302   """
2303   bdevs = _FindDisks(nodes_ip, disks)
2304
2305   if multimaster:
2306     for idx, rd in enumerate(bdevs):
2307       try:
2308         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2309       except EnvironmentError, err:
2310         _Fail("Can't create symlink: %s", err)
2311   # reconnect disks, switch to new master configuration and if
2312   # needed primary mode
2313   for rd in bdevs:
2314     try:
2315       rd.AttachNet(multimaster)
2316     except errors.BlockDeviceError, err:
2317       _Fail("Can't change network configuration: %s", err)
2318   # wait until the disks are connected; we need to retry the re-attach
2319   # if the device becomes standalone, as this might happen if the one
2320   # node disconnects and reconnects in a different mode before the
2321   # other node reconnects; in this case, one or both of the nodes will
2322   # decide it has wrong configuration and switch to standalone
2323   RECONNECT_TIMEOUT = 2 * 60
2324   sleep_time = 0.100 # start with 100 miliseconds
2325   timeout_limit = time.time() + RECONNECT_TIMEOUT
2326   while time.time() < timeout_limit:
2327     all_connected = True
2328     for rd in bdevs:
2329       stats = rd.GetProcStatus()
2330       if not (stats.is_connected or stats.is_in_resync):
2331         all_connected = False
2332       if stats.is_standalone:
2333         # peer had different config info and this node became
2334         # standalone, even though this should not happen with the
2335         # new staged way of changing disk configs
2336         try:
2337           rd.AttachNet(multimaster)
2338         except errors.BlockDeviceError, err:
2339           _Fail("Can't change network configuration: %s", err)
2340     if all_connected:
2341       break
2342     time.sleep(sleep_time)
2343     sleep_time = min(5, sleep_time * 1.5)
2344   if not all_connected:
2345     _Fail("Timeout in disk reconnecting")
2346   if multimaster:
2347     # change to primary mode
2348     for rd in bdevs:
2349       try:
2350         rd.Open()
2351       except errors.BlockDeviceError, err:
2352         _Fail("Can't change to primary mode: %s", err)
2353
2354
2355 def DrbdWaitSync(nodes_ip, disks):
2356   """Wait until DRBDs have synchronized.
2357
2358   """
2359   bdevs = _FindDisks(nodes_ip, disks)
2360
2361   min_resync = 100
2362   alldone = True
2363   for rd in bdevs:
2364     stats = rd.GetProcStatus()
2365     if not (stats.is_connected or stats.is_in_resync):
2366       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2367     alldone = alldone and (not stats.is_in_resync)
2368     if stats.sync_percent is not None:
2369       min_resync = min(min_resync, stats.sync_percent)
2370
2371   return (alldone, min_resync)
2372
2373
2374 def PowercycleNode(hypervisor_type):
2375   """Hard-powercycle the node.
2376
2377   Because we need to return first, and schedule the powercycle in the
2378   background, we won't be able to report failures nicely.
2379
2380   """
2381   hyper = hypervisor.GetHypervisor(hypervisor_type)
2382   try:
2383     pid = os.fork()
2384   except OSError:
2385     # if we can't fork, we'll pretend that we're in the child process
2386     pid = 0
2387   if pid > 0:
2388     return "Reboot scheduled in 5 seconds"
2389   time.sleep(5)
2390   hyper.PowercycleNode()
2391
2392
2393 class HooksRunner(object):
2394   """Hook runner.
2395
2396   This class is instantiated on the node side (ganeti-noded) and not
2397   on the master side.
2398
2399   """
2400   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2401
2402   def __init__(self, hooks_base_dir=None):
2403     """Constructor for hooks runner.
2404
2405     @type hooks_base_dir: str or None
2406     @param hooks_base_dir: if not None, this overrides the
2407         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2408
2409     """
2410     if hooks_base_dir is None:
2411       hooks_base_dir = constants.HOOKS_BASE_DIR
2412     self._BASE_DIR = hooks_base_dir
2413
2414   @staticmethod
2415   def ExecHook(script, env):
2416     """Exec one hook script.
2417
2418     @type script: str
2419     @param script: the full path to the script
2420     @type env: dict
2421     @param env: the environment with which to exec the script
2422     @rtype: tuple (success, message)
2423     @return: a tuple of success and message, where success
2424         indicates the succes of the operation, and message
2425         which will contain the error details in case we
2426         failed
2427
2428     """
2429     # exec the process using subprocess and log the output
2430     fdstdin = None
2431     try:
2432       fdstdin = open("/dev/null", "r")
2433       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2434                                stderr=subprocess.STDOUT, close_fds=True,
2435                                shell=False, cwd="/", env=env)
2436       output = ""
2437       try:
2438         output = child.stdout.read(4096)
2439         child.stdout.close()
2440       except EnvironmentError, err:
2441         output += "Hook script error: %s" % str(err)
2442
2443       while True:
2444         try:
2445           result = child.wait()
2446           break
2447         except EnvironmentError, err:
2448           if err.errno == errno.EINTR:
2449             continue
2450           raise
2451     finally:
2452       # try not to leak fds
2453       for fd in (fdstdin, ):
2454         if fd is not None:
2455           try:
2456             fd.close()
2457           except EnvironmentError, err:
2458             # just log the error
2459             #logging.exception("Error while closing fd %s", fd)
2460             pass
2461
2462     return result == 0, utils.SafeEncode(output.strip())
2463
2464   def RunHooks(self, hpath, phase, env):
2465     """Run the scripts in the hooks directory.
2466
2467     @type hpath: str
2468     @param hpath: the path to the hooks directory which
2469         holds the scripts
2470     @type phase: str
2471     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2472         L{constants.HOOKS_PHASE_POST}
2473     @type env: dict
2474     @param env: dictionary with the environment for the hook
2475     @rtype: list
2476     @return: list of 3-element tuples:
2477       - script path
2478       - script result, either L{constants.HKR_SUCCESS} or
2479         L{constants.HKR_FAIL}
2480       - output of the script
2481
2482     @raise errors.ProgrammerError: for invalid input
2483         parameters
2484
2485     """
2486     if phase == constants.HOOKS_PHASE_PRE:
2487       suffix = "pre"
2488     elif phase == constants.HOOKS_PHASE_POST:
2489       suffix = "post"
2490     else:
2491       _Fail("Unknown hooks phase '%s'", phase)
2492
2493     rr = []
2494
2495     subdir = "%s-%s.d" % (hpath, suffix)
2496     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2497     try:
2498       dir_contents = utils.ListVisibleFiles(dir_name)
2499     except OSError:
2500       # FIXME: must log output in case of failures
2501       return rr
2502
2503     # we use the standard python sort order,
2504     # so 00name is the recommended naming scheme
2505     dir_contents.sort()
2506     for relname in dir_contents:
2507       fname = os.path.join(dir_name, relname)
2508       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2509           self.RE_MASK.match(relname) is not None):
2510         rrval = constants.HKR_SKIP
2511         output = ""
2512       else:
2513         result, output = self.ExecHook(fname, env)
2514         if not result:
2515           rrval = constants.HKR_FAIL
2516         else:
2517           rrval = constants.HKR_SUCCESS
2518       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2519
2520     return rr
2521
2522
2523 class IAllocatorRunner(object):
2524   """IAllocator runner.
2525
2526   This class is instantiated on the node side (ganeti-noded) and not on
2527   the master side.
2528
2529   """
2530   def Run(self, name, idata):
2531     """Run an iallocator script.
2532
2533     @type name: str
2534     @param name: the iallocator script name
2535     @type idata: str
2536     @param idata: the allocator input data
2537
2538     @rtype: tuple
2539     @return: two element tuple of:
2540        - status
2541        - either error message or stdout of allocator (for success)
2542
2543     """
2544     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2545                                   os.path.isfile)
2546     if alloc_script is None:
2547       _Fail("iallocator module '%s' not found in the search path", name)
2548
2549     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2550     try:
2551       os.write(fd, idata)
2552       os.close(fd)
2553       result = utils.RunCmd([alloc_script, fin_name])
2554       if result.failed:
2555         _Fail("iallocator module '%s' failed: %s, output '%s'",
2556               name, result.fail_reason, result.output)
2557     finally:
2558       os.unlink(fin_name)
2559
2560     return result.stdout
2561
2562
2563 class DevCacheManager(object):
2564   """Simple class for managing a cache of block device information.
2565
2566   """
2567   _DEV_PREFIX = "/dev/"
2568   _ROOT_DIR = constants.BDEV_CACHE_DIR
2569
2570   @classmethod
2571   def _ConvertPath(cls, dev_path):
2572     """Converts a /dev/name path to the cache file name.
2573
2574     This replaces slashes with underscores and strips the /dev
2575     prefix. It then returns the full path to the cache file.
2576
2577     @type dev_path: str
2578     @param dev_path: the C{/dev/} path name
2579     @rtype: str
2580     @return: the converted path name
2581
2582     """
2583     if dev_path.startswith(cls._DEV_PREFIX):
2584       dev_path = dev_path[len(cls._DEV_PREFIX):]
2585     dev_path = dev_path.replace("/", "_")
2586     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2587     return fpath
2588
2589   @classmethod
2590   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2591     """Updates the cache information for a given device.
2592
2593     @type dev_path: str
2594     @param dev_path: the pathname of the device
2595     @type owner: str
2596     @param owner: the owner (instance name) of the device
2597     @type on_primary: bool
2598     @param on_primary: whether this is the primary
2599         node nor not
2600     @type iv_name: str
2601     @param iv_name: the instance-visible name of the
2602         device, as in objects.Disk.iv_name
2603
2604     @rtype: None
2605
2606     """
2607     if dev_path is None:
2608       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2609       return
2610     fpath = cls._ConvertPath(dev_path)
2611     if on_primary:
2612       state = "primary"
2613     else:
2614       state = "secondary"
2615     if iv_name is None:
2616       iv_name = "not_visible"
2617     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2618     try:
2619       utils.WriteFile(fpath, data=fdata)
2620     except EnvironmentError, err:
2621       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2622
2623   @classmethod
2624   def RemoveCache(cls, dev_path):
2625     """Remove data for a dev_path.
2626
2627     This is just a wrapper over L{utils.RemoveFile} with a converted
2628     path name and logging.
2629
2630     @type dev_path: str
2631     @param dev_path: the pathname of the device
2632
2633     @rtype: None
2634
2635     """
2636     if dev_path is None:
2637       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2638       return
2639     fpath = cls._ConvertPath(dev_path)
2640     try:
2641       utils.RemoveFile(fpath)
2642     except EnvironmentError, err:
2643       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)