Fix various pylint warnings
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "log" not in kwargs or kwargs["log"]: # if we should log this error
72     if "exc" in kwargs and kwargs["exc"]:
73       logging.exception(msg)
74     else:
75       logging.error(msg)
76   raise RPCFail(msg)
77
78
79 def _GetConfig():
80   """Simple wrapper to return a SimpleStore.
81
82   @rtype: L{ssconf.SimpleStore}
83   @return: a SimpleStore instance
84
85   """
86   return ssconf.SimpleStore()
87
88
89 def _GetSshRunner(cluster_name):
90   """Simple wrapper to return an SshRunner.
91
92   @type cluster_name: str
93   @param cluster_name: the cluster name, which is needed
94       by the SshRunner constructor
95   @rtype: L{ssh.SshRunner}
96   @return: an SshRunner instance
97
98   """
99   return ssh.SshRunner(cluster_name)
100
101
102 def _Decompress(data):
103   """Unpacks data compressed by the RPC client.
104
105   @type data: list or tuple
106   @param data: Data sent by RPC client
107   @rtype: str
108   @return: Decompressed data
109
110   """
111   assert isinstance(data, (list, tuple))
112   assert len(data) == 2
113   (encoding, content) = data
114   if encoding == constants.RPC_ENCODING_NONE:
115     return content
116   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
117     return zlib.decompress(base64.b64decode(content))
118   else:
119     raise AssertionError("Unknown data encoding")
120
121
122 def _CleanDirectory(path, exclude=None):
123   """Removes all regular files in a directory.
124
125   @type path: str
126   @param path: the directory to clean
127   @type exclude: list
128   @param exclude: list of files to be excluded, defaults
129       to the empty list
130
131   """
132   if not os.path.isdir(path):
133     return
134   if exclude is None:
135     exclude = []
136   else:
137     # Normalize excluded paths
138     exclude = [os.path.normpath(i) for i in exclude]
139
140   for rel_name in utils.ListVisibleFiles(path):
141     full_name = os.path.normpath(os.path.join(path, rel_name))
142     if full_name in exclude:
143       continue
144     if os.path.isfile(full_name) and not os.path.islink(full_name):
145       utils.RemoveFile(full_name)
146
147
148 def JobQueuePurge():
149   """Removes job queue files and archived jobs.
150
151   @rtype: tuple
152   @return: True, None
153
154   """
155   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
156   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
157
158
159 def GetMasterInfo():
160   """Returns master information.
161
162   This is an utility function to compute master information, either
163   for consumption here or from the node daemon.
164
165   @rtype: tuple
166   @return: master_netdev, master_ip, master_name
167   @raise RPCFail: in case of errors
168
169   """
170   try:
171     cfg = _GetConfig()
172     master_netdev = cfg.GetMasterNetdev()
173     master_ip = cfg.GetMasterIP()
174     master_node = cfg.GetMasterNode()
175   except errors.ConfigurationError, err:
176     _Fail("Cluster configuration incomplete: %s", err, exc=True)
177   return master_netdev, master_ip, master_node
178
179
180 def StartMaster(start_daemons):
181   """Activate local node as master node.
182
183   The function will always try activate the IP address of the master
184   (unless someone else has it). It will also start the master daemons,
185   based on the start_daemons parameter.
186
187   @type start_daemons: boolean
188   @param start_daemons: whether to also start the master
189       daemons (ganeti-masterd and ganeti-rapi)
190   @rtype: None
191
192   """
193   # GetMasterInfo will raise an exception if not able to return data
194   master_netdev, master_ip, _ = GetMasterInfo()
195
196   payload = []
197   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
198     if utils.OwnIpAddress(master_ip):
199       # we already have the ip:
200       logging.debug("Master IP already configured, doing nothing")
201     else:
202       msg = "Someone else has the master ip, not activating"
203       logging.error(msg)
204       payload.append(msg)
205   else:
206     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
207                            "dev", master_netdev, "label",
208                            "%s:0" % master_netdev])
209     if result.failed:
210       msg = "Can't activate master IP: %s" % result.output
211       logging.error(msg)
212       payload.append(msg)
213
214     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
215                            "-s", master_ip, master_ip])
216     # we'll ignore the exit code of arping
217
218   # and now start the master and rapi daemons
219   if start_daemons:
220     for daemon in 'ganeti-masterd', 'ganeti-rapi':
221       result = utils.RunCmd([daemon])
222       if result.failed:
223         msg = "Can't start daemon %s: %s" % (daemon, result.output)
224         logging.error(msg)
225         payload.append(msg)
226
227   if payload:
228     _Fail("; ".join(payload))
229
230
231 def StopMaster(stop_daemons):
232   """Deactivate this node as master.
233
234   The function will always try to deactivate the IP address of the
235   master. It will also stop the master daemons depending on the
236   stop_daemons parameter.
237
238   @type stop_daemons: boolean
239   @param stop_daemons: whether to also stop the master daemons
240       (ganeti-masterd and ganeti-rapi)
241   @rtype: None
242
243   """
244   # TODO: log and report back to the caller the error failures; we
245   # need to decide in which case we fail the RPC for this
246
247   # GetMasterInfo will raise an exception if not able to return data
248   master_netdev, master_ip, _ = GetMasterInfo()
249
250   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
251                          "dev", master_netdev])
252   if result.failed:
253     logging.error("Can't remove the master IP, error: %s", result.output)
254     # but otherwise ignore the failure
255
256   if stop_daemons:
257     # stop/kill the rapi and the master daemon
258     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
259       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
260
261
262 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
263   """Joins this node to the cluster.
264
265   This does the following:
266       - updates the hostkeys of the machine (rsa and dsa)
267       - adds the ssh private key to the user
268       - adds the ssh public key to the users' authorized_keys file
269
270   @type dsa: str
271   @param dsa: the DSA private key to write
272   @type dsapub: str
273   @param dsapub: the DSA public key to write
274   @type rsa: str
275   @param rsa: the RSA private key to write
276   @type rsapub: str
277   @param rsapub: the RSA public key to write
278   @type sshkey: str
279   @param sshkey: the SSH private key to write
280   @type sshpub: str
281   @param sshpub: the SSH public key to write
282   @rtype: boolean
283   @return: the success of the operation
284
285   """
286   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
287                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
288                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
289                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
290   for name, content, mode in sshd_keys:
291     utils.WriteFile(name, data=content, mode=mode)
292
293   try:
294     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
295                                                     mkdir=True)
296   except errors.OpExecError, err:
297     _Fail("Error while processing user ssh files: %s", err, exc=True)
298
299   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
300     utils.WriteFile(name, data=content, mode=0600)
301
302   utils.AddAuthorizedKey(auth_keys, sshpub)
303
304   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
305
306
307 def LeaveCluster():
308   """Cleans up and remove the current node.
309
310   This function cleans up and prepares the current node to be removed
311   from the cluster.
312
313   If processing is successful, then it raises an
314   L{errors.QuitGanetiException} which is used as a special case to
315   shutdown the node daemon.
316
317   """
318   _CleanDirectory(constants.DATA_DIR)
319   JobQueuePurge()
320
321   try:
322     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
323
324     f = open(pub_key, 'r')
325     try:
326       utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
327     finally:
328       f.close()
329
330     utils.RemoveFile(priv_key)
331     utils.RemoveFile(pub_key)
332   except errors.OpExecError:
333     logging.exception("Error while processing ssh files")
334
335   # Raise a custom exception (handled in ganeti-noded)
336   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
337
338
339 def GetNodeInfo(vgname, hypervisor_type):
340   """Gives back a hash with different informations about the node.
341
342   @type vgname: C{string}
343   @param vgname: the name of the volume group to ask for disk space information
344   @type hypervisor_type: C{str}
345   @param hypervisor_type: the name of the hypervisor to ask for
346       memory information
347   @rtype: C{dict}
348   @return: dictionary with the following keys:
349       - vg_size is the size of the configured volume group in MiB
350       - vg_free is the free size of the volume group in MiB
351       - memory_dom0 is the memory allocated for domain0 in MiB
352       - memory_free is the currently available (free) ram in MiB
353       - memory_total is the total number of ram in MiB
354
355   """
356   outputarray = {}
357   vginfo = _GetVGInfo(vgname)
358   outputarray['vg_size'] = vginfo['vg_size']
359   outputarray['vg_free'] = vginfo['vg_free']
360
361   hyper = hypervisor.GetHypervisor(hypervisor_type)
362   hyp_info = hyper.GetNodeInfo()
363   if hyp_info is not None:
364     outputarray.update(hyp_info)
365
366   f = open("/proc/sys/kernel/random/boot_id", 'r')
367   try:
368     outputarray["bootid"] = f.read(128).rstrip("\n")
369   finally:
370     f.close()
371
372   return outputarray
373
374
375 def VerifyNode(what, cluster_name):
376   """Verify the status of the local node.
377
378   Based on the input L{what} parameter, various checks are done on the
379   local node.
380
381   If the I{filelist} key is present, this list of
382   files is checksummed and the file/checksum pairs are returned.
383
384   If the I{nodelist} key is present, we check that we have
385   connectivity via ssh with the target nodes (and check the hostname
386   report).
387
388   If the I{node-net-test} key is present, we check that we have
389   connectivity to the given nodes via both primary IP and, if
390   applicable, secondary IPs.
391
392   @type what: C{dict}
393   @param what: a dictionary of things to check:
394       - filelist: list of files for which to compute checksums
395       - nodelist: list of nodes we should check ssh communication with
396       - node-net-test: list of nodes we should check node daemon port
397         connectivity with
398       - hypervisor: list with hypervisors to run the verify for
399   @rtype: dict
400   @return: a dictionary with the same keys as the input dict, and
401       values representing the result of the checks
402
403   """
404   result = {}
405
406   if constants.NV_HYPERVISOR in what:
407     result[constants.NV_HYPERVISOR] = tmp = {}
408     for hv_name in what[constants.NV_HYPERVISOR]:
409       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
410
411   if constants.NV_FILELIST in what:
412     result[constants.NV_FILELIST] = utils.FingerprintFiles(
413       what[constants.NV_FILELIST])
414
415   if constants.NV_NODELIST in what:
416     result[constants.NV_NODELIST] = tmp = {}
417     random.shuffle(what[constants.NV_NODELIST])
418     for node in what[constants.NV_NODELIST]:
419       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
420       if not success:
421         tmp[node] = message
422
423   if constants.NV_NODENETTEST in what:
424     result[constants.NV_NODENETTEST] = tmp = {}
425     my_name = utils.HostInfo().name
426     my_pip = my_sip = None
427     for name, pip, sip in what[constants.NV_NODENETTEST]:
428       if name == my_name:
429         my_pip = pip
430         my_sip = sip
431         break
432     if not my_pip:
433       tmp[my_name] = ("Can't find my own primary/secondary IP"
434                       " in the node list")
435     else:
436       port = utils.GetNodeDaemonPort()
437       for name, pip, sip in what[constants.NV_NODENETTEST]:
438         fail = []
439         if not utils.TcpPing(pip, port, source=my_pip):
440           fail.append("primary")
441         if sip != pip:
442           if not utils.TcpPing(sip, port, source=my_sip):
443             fail.append("secondary")
444         if fail:
445           tmp[name] = ("failure using the %s interface(s)" %
446                        " and ".join(fail))
447
448   if constants.NV_LVLIST in what:
449     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
450
451   if constants.NV_INSTANCELIST in what:
452     result[constants.NV_INSTANCELIST] = GetInstanceList(
453       what[constants.NV_INSTANCELIST])
454
455   if constants.NV_VGLIST in what:
456     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
457
458   if constants.NV_VERSION in what:
459     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
460                                     constants.RELEASE_VERSION)
461
462   if constants.NV_HVINFO in what:
463     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
464     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
465
466   if constants.NV_DRBDLIST in what:
467     try:
468       used_minors = bdev.DRBD8.GetUsedDevs().keys()
469     except errors.BlockDeviceError, err:
470       logging.warning("Can't get used minors list", exc_info=True)
471       used_minors = str(err)
472     result[constants.NV_DRBDLIST] = used_minors
473
474   return result
475
476
477 def GetVolumeList(vg_name):
478   """Compute list of logical volumes and their size.
479
480   @type vg_name: str
481   @param vg_name: the volume group whose LVs we should list
482   @rtype: dict
483   @return:
484       dictionary of all partions (key) with value being a tuple of
485       their size (in MiB), inactive and online status::
486
487         {'test1': ('20.06', True, True)}
488
489       in case of errors, a string is returned with the error
490       details.
491
492   """
493   lvs = {}
494   sep = '|'
495   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
496                          "--separator=%s" % sep,
497                          "-olv_name,lv_size,lv_attr", vg_name])
498   if result.failed:
499     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
500
501   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
502   for line in result.stdout.splitlines():
503     line = line.strip()
504     match = valid_line_re.match(line)
505     if not match:
506       logging.error("Invalid line returned from lvs output: '%s'", line)
507       continue
508     name, size, attr = match.groups()
509     inactive = attr[4] == '-'
510     online = attr[5] == 'o'
511     lvs[name] = (size, inactive, online)
512
513   return lvs
514
515
516 def ListVolumeGroups():
517   """List the volume groups and their size.
518
519   @rtype: dict
520   @return: dictionary with keys volume name and values the
521       size of the volume
522
523   """
524   return utils.ListVolumeGroups()
525
526
527 def NodeVolumes():
528   """List all volumes on this node.
529
530   @rtype: list
531   @return:
532     A list of dictionaries, each having four keys:
533       - name: the logical volume name,
534       - size: the size of the logical volume
535       - dev: the physical device on which the LV lives
536       - vg: the volume group to which it belongs
537
538     In case of errors, we return an empty list and log the
539     error.
540
541     Note that since a logical volume can live on multiple physical
542     volumes, the resulting list might include a logical volume
543     multiple times.
544
545   """
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=|",
548                          "--options=lv_name,lv_size,devices,vg_name"])
549   if result.failed:
550     _Fail("Failed to list logical volumes, lvs output: %s",
551           result.output)
552
553   def parse_dev(dev):
554     if '(' in dev:
555       return dev.split('(')[0]
556     else:
557       return dev
558
559   def map_line(line):
560     return {
561       'name': line[0].strip(),
562       'size': line[1].strip(),
563       'dev': parse_dev(line[2].strip()),
564       'vg': line[3].strip(),
565     }
566
567   return [map_line(line.split('|')) for line in result.stdout.splitlines()
568           if line.count('|') >= 3]
569
570
571 def BridgesExist(bridges_list):
572   """Check if a list of bridges exist on the current node.
573
574   @rtype: boolean
575   @return: C{True} if all of them exist, C{False} otherwise
576
577   """
578   missing = []
579   for bridge in bridges_list:
580     if not utils.BridgeExists(bridge):
581       missing.append(bridge)
582
583   if missing:
584     _Fail("Missing bridges %s", ", ".join(missing))
585
586
587 def GetInstanceList(hypervisor_list):
588   """Provides a list of instances.
589
590   @type hypervisor_list: list
591   @param hypervisor_list: the list of hypervisors to query information
592
593   @rtype: list
594   @return: a list of all running instances on the current node
595     - instance1.example.com
596     - instance2.example.com
597
598   """
599   results = []
600   for hname in hypervisor_list:
601     try:
602       names = hypervisor.GetHypervisor(hname).ListInstances()
603       results.extend(names)
604     except errors.HypervisorError, err:
605       _Fail("Error enumerating instances (hypervisor %s): %s",
606             hname, err, exc=True)
607
608   return results
609
610
611 def GetInstanceInfo(instance, hname):
612   """Gives back the informations about an instance as a dictionary.
613
614   @type instance: string
615   @param instance: the instance name
616   @type hname: string
617   @param hname: the hypervisor type of the instance
618
619   @rtype: dict
620   @return: dictionary with the following keys:
621       - memory: memory size of instance (int)
622       - state: xen state of instance (string)
623       - time: cpu time of instance (float)
624
625   """
626   output = {}
627
628   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
629   if iinfo is not None:
630     output['memory'] = iinfo[2]
631     output['state'] = iinfo[4]
632     output['time'] = iinfo[5]
633
634   return output
635
636
637 def GetInstanceMigratable(instance):
638   """Gives whether an instance can be migrated.
639
640   @type instance: L{objects.Instance}
641   @param instance: object representing the instance to be checked.
642
643   @rtype: tuple
644   @return: tuple of (result, description) where:
645       - result: whether the instance can be migrated or not
646       - description: a description of the issue, if relevant
647
648   """
649   hyper = hypervisor.GetHypervisor(instance.hypervisor)
650   iname = instance.name
651   if iname not in hyper.ListInstances():
652     _Fail("Instance %s is not running", iname)
653
654   for idx in range(len(instance.disks)):
655     link_name = _GetBlockDevSymlinkPath(iname, idx)
656     if not os.path.islink(link_name):
657       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
658
659
660 def GetAllInstancesInfo(hypervisor_list):
661   """Gather data about all instances.
662
663   This is the equivalent of L{GetInstanceInfo}, except that it
664   computes data for all instances at once, thus being faster if one
665   needs data about more than one instance.
666
667   @type hypervisor_list: list
668   @param hypervisor_list: list of hypervisors to query for instance data
669
670   @rtype: dict
671   @return: dictionary of instance: data, with data having the following keys:
672       - memory: memory size of instance (int)
673       - state: xen state of instance (string)
674       - time: cpu time of instance (float)
675       - vcpus: the number of vcpus
676
677   """
678   output = {}
679
680   for hname in hypervisor_list:
681     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
682     if iinfo:
683       for name, _, memory, vcpus, state, times in iinfo:
684         value = {
685           'memory': memory,
686           'vcpus': vcpus,
687           'state': state,
688           'time': times,
689           }
690         if name in output:
691           # we only check static parameters, like memory and vcpus,
692           # and not state and time which can change between the
693           # invocations of the different hypervisors
694           for key in 'memory', 'vcpus':
695             if value[key] != output[name][key]:
696               _Fail("Instance %s is running twice"
697                     " with different parameters", name)
698         output[name] = value
699
700   return output
701
702
703 def InstanceOsAdd(instance, reinstall):
704   """Add an OS to an instance.
705
706   @type instance: L{objects.Instance}
707   @param instance: Instance whose OS is to be installed
708   @type reinstall: boolean
709   @param reinstall: whether this is an instance reinstall
710   @rtype: None
711
712   """
713   inst_os = OSFromDisk(instance.os)
714
715   create_env = OSEnvironment(instance)
716   if reinstall:
717     create_env['INSTANCE_REINSTALL'] = "1"
718
719   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
720                                      instance.name, int(time.time()))
721
722   result = utils.RunCmd([inst_os.create_script], env=create_env,
723                         cwd=inst_os.path, output=logfile,)
724   if result.failed:
725     logging.error("os create command '%s' returned error: %s, logfile: %s,"
726                   " output: %s", result.cmd, result.fail_reason, logfile,
727                   result.output)
728     lines = [utils.SafeEncode(val)
729              for val in utils.TailFile(logfile, lines=20)]
730     _Fail("OS create script failed (%s), last lines in the"
731           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
732
733
734 def RunRenameInstance(instance, old_name):
735   """Run the OS rename script for an instance.
736
737   @type instance: L{objects.Instance}
738   @param instance: Instance whose OS is to be installed
739   @type old_name: string
740   @param old_name: previous instance name
741   @rtype: boolean
742   @return: the success of the operation
743
744   """
745   inst_os = OSFromDisk(instance.os)
746
747   rename_env = OSEnvironment(instance)
748   rename_env['OLD_INSTANCE_NAME'] = old_name
749
750   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
751                                            old_name,
752                                            instance.name, int(time.time()))
753
754   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
755                         cwd=inst_os.path, output=logfile)
756
757   if result.failed:
758     logging.error("os create command '%s' returned error: %s output: %s",
759                   result.cmd, result.fail_reason, result.output)
760     lines = [utils.SafeEncode(val)
761              for val in utils.TailFile(logfile, lines=20)]
762     _Fail("OS rename script failed (%s), last lines in the"
763           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
764
765
766 def _GetVGInfo(vg_name):
767   """Get informations about the volume group.
768
769   @type vg_name: str
770   @param vg_name: the volume group which we query
771   @rtype: dict
772   @return:
773     A dictionary with the following keys:
774       - C{vg_size} is the total size of the volume group in MiB
775       - C{vg_free} is the free size of the volume group in MiB
776       - C{pv_count} are the number of physical disks in that VG
777
778     If an error occurs during gathering of data, we return the same dict
779     with keys all set to None.
780
781   """
782   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
783
784   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
785                          "--nosuffix", "--units=m", "--separator=:", vg_name])
786
787   if retval.failed:
788     logging.error("volume group %s not present", vg_name)
789     return retdic
790   valarr = retval.stdout.strip().rstrip(':').split(':')
791   if len(valarr) == 3:
792     try:
793       retdic = {
794         "vg_size": int(round(float(valarr[0]), 0)),
795         "vg_free": int(round(float(valarr[1]), 0)),
796         "pv_count": int(valarr[2]),
797         }
798     except ValueError, err:
799       logging.exception("Fail to parse vgs output: %s", err)
800   else:
801     logging.error("vgs output has the wrong number of fields (expected"
802                   " three): %s", str(valarr))
803   return retdic
804
805
806 def _GetBlockDevSymlinkPath(instance_name, idx):
807   return os.path.join(constants.DISK_LINKS_DIR,
808                       "%s:%d" % (instance_name, idx))
809
810
811 def _SymlinkBlockDev(instance_name, device_path, idx):
812   """Set up symlinks to a instance's block device.
813
814   This is an auxiliary function run when an instance is start (on the primary
815   node) or when an instance is migrated (on the target node).
816
817
818   @param instance_name: the name of the target instance
819   @param device_path: path of the physical block device, on the node
820   @param idx: the disk index
821   @return: absolute path to the disk's symlink
822
823   """
824   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
825   try:
826     os.symlink(device_path, link_name)
827   except OSError, err:
828     if err.errno == errno.EEXIST:
829       if (not os.path.islink(link_name) or
830           os.readlink(link_name) != device_path):
831         os.remove(link_name)
832         os.symlink(device_path, link_name)
833     else:
834       raise
835
836   return link_name
837
838
839 def _RemoveBlockDevLinks(instance_name, disks):
840   """Remove the block device symlinks belonging to the given instance.
841
842   """
843   for idx, _ in enumerate(disks):
844     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
845     if os.path.islink(link_name):
846       try:
847         os.remove(link_name)
848       except OSError:
849         logging.exception("Can't remove symlink '%s'", link_name)
850
851
852 def _GatherAndLinkBlockDevs(instance):
853   """Set up an instance's block device(s).
854
855   This is run on the primary node at instance startup. The block
856   devices must be already assembled.
857
858   @type instance: L{objects.Instance}
859   @param instance: the instance whose disks we shoul assemble
860   @rtype: list
861   @return: list of (disk_object, device_path)
862
863   """
864   block_devices = []
865   for idx, disk in enumerate(instance.disks):
866     device = _RecursiveFindBD(disk)
867     if device is None:
868       raise errors.BlockDeviceError("Block device '%s' is not set up." %
869                                     str(disk))
870     device.Open()
871     try:
872       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
873     except OSError, e:
874       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
875                                     e.strerror)
876
877     block_devices.append((disk, link_name))
878
879   return block_devices
880
881
882 def StartInstance(instance):
883   """Start an instance.
884
885   @type instance: L{objects.Instance}
886   @param instance: the instance object
887   @rtype: None
888
889   """
890   running_instances = GetInstanceList([instance.hypervisor])
891
892   if instance.name in running_instances:
893     logging.info("Instance %s already running, not starting", instance.name)
894     return
895
896   try:
897     block_devices = _GatherAndLinkBlockDevs(instance)
898     hyper = hypervisor.GetHypervisor(instance.hypervisor)
899     hyper.StartInstance(instance, block_devices)
900   except errors.BlockDeviceError, err:
901     _Fail("Block device error: %s", err, exc=True)
902   except errors.HypervisorError, err:
903     _RemoveBlockDevLinks(instance.name, instance.disks)
904     _Fail("Hypervisor error: %s", err, exc=True)
905
906
907 def InstanceShutdown(instance):
908   """Shut an instance down.
909
910   @note: this functions uses polling with a hardcoded timeout.
911
912   @type instance: L{objects.Instance}
913   @param instance: the instance object
914   @rtype: None
915
916   """
917   hv_name = instance.hypervisor
918   running_instances = GetInstanceList([hv_name])
919   iname = instance.name
920
921   if iname not in running_instances:
922     logging.info("Instance %s not running, doing nothing", iname)
923     return
924
925   hyper = hypervisor.GetHypervisor(hv_name)
926   try:
927     hyper.StopInstance(instance)
928   except errors.HypervisorError, err:
929     _Fail("Failed to stop instance %s: %s", iname, err)
930
931   # test every 10secs for 2min
932
933   time.sleep(1)
934   for dummy in range(11):
935     if instance.name not in GetInstanceList([hv_name]):
936       break
937     time.sleep(10)
938   else:
939     # the shutdown did not succeed
940     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
941
942     try:
943       hyper.StopInstance(instance, force=True)
944     except errors.HypervisorError, err:
945       _Fail("Failed to force stop instance %s: %s", iname, err)
946
947     time.sleep(1)
948     if instance.name in GetInstanceList([hv_name]):
949       _Fail("Could not shutdown instance %s even by destroy", iname)
950
951   _RemoveBlockDevLinks(iname, instance.disks)
952
953
954 def InstanceReboot(instance, reboot_type):
955   """Reboot an instance.
956
957   @type instance: L{objects.Instance}
958   @param instance: the instance object to reboot
959   @type reboot_type: str
960   @param reboot_type: the type of reboot, one the following
961     constants:
962       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
963         instance OS, do not recreate the VM
964       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
965         restart the VM (at the hypervisor level)
966       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
967         is not accepted here, since that mode is handled
968         differently
969   @rtype: None
970
971   """
972   running_instances = GetInstanceList([instance.hypervisor])
973
974   if instance.name not in running_instances:
975     _Fail("Cannot reboot instance %s that is not running", instance.name)
976
977   hyper = hypervisor.GetHypervisor(instance.hypervisor)
978   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
979     try:
980       hyper.RebootInstance(instance)
981     except errors.HypervisorError, err:
982       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
983   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
984     try:
985       InstanceShutdown(instance)
986       return StartInstance(instance)
987     except errors.HypervisorError, err:
988       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
989   else:
990     _Fail("Invalid reboot_type received: %s", reboot_type)
991
992
993 def MigrationInfo(instance):
994   """Gather information about an instance to be migrated.
995
996   @type instance: L{objects.Instance}
997   @param instance: the instance definition
998
999   """
1000   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1001   try:
1002     info = hyper.MigrationInfo(instance)
1003   except errors.HypervisorError, err:
1004     _Fail("Failed to fetch migration information: %s", err, exc=True)
1005   return info
1006
1007
1008 def AcceptInstance(instance, info, target):
1009   """Prepare the node to accept an instance.
1010
1011   @type instance: L{objects.Instance}
1012   @param instance: the instance definition
1013   @type info: string/data (opaque)
1014   @param info: migration information, from the source node
1015   @type target: string
1016   @param target: target host (usually ip), on this node
1017
1018   """
1019   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1020   try:
1021     hyper.AcceptInstance(instance, info, target)
1022   except errors.HypervisorError, err:
1023     _Fail("Failed to accept instance: %s", err, exc=True)
1024
1025
1026 def FinalizeMigration(instance, info, success):
1027   """Finalize any preparation to accept an instance.
1028
1029   @type instance: L{objects.Instance}
1030   @param instance: the instance definition
1031   @type info: string/data (opaque)
1032   @param info: migration information, from the source node
1033   @type success: boolean
1034   @param success: whether the migration was a success or a failure
1035
1036   """
1037   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1038   try:
1039     hyper.FinalizeMigration(instance, info, success)
1040   except errors.HypervisorError, err:
1041     _Fail("Failed to finalize migration: %s", err, exc=True)
1042
1043
1044 def MigrateInstance(instance, target, live):
1045   """Migrates an instance to another node.
1046
1047   @type instance: L{objects.Instance}
1048   @param instance: the instance definition
1049   @type target: string
1050   @param target: the target node name
1051   @type live: boolean
1052   @param live: whether the migration should be done live or not (the
1053       interpretation of this parameter is left to the hypervisor)
1054   @rtype: tuple
1055   @return: a tuple of (success, msg) where:
1056       - succes is a boolean denoting the success/failure of the operation
1057       - msg is a string with details in case of failure
1058
1059   """
1060   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061
1062   try:
1063     hyper.MigrateInstance(instance.name, target, live)
1064   except errors.HypervisorError, err:
1065     _Fail("Failed to migrate instance: %s", err, exc=True)
1066
1067
1068 def BlockdevCreate(disk, size, owner, on_primary, info):
1069   """Creates a block device for an instance.
1070
1071   @type disk: L{objects.Disk}
1072   @param disk: the object describing the disk we should create
1073   @type size: int
1074   @param size: the size of the physical underlying device, in MiB
1075   @type owner: str
1076   @param owner: the name of the instance for which disk is created,
1077       used for device cache data
1078   @type on_primary: boolean
1079   @param on_primary:  indicates if it is the primary node or not
1080   @type info: string
1081   @param info: string that will be sent to the physical device
1082       creation, used for example to set (LVM) tags on LVs
1083
1084   @return: the new unique_id of the device (this can sometime be
1085       computed only after creation), or None. On secondary nodes,
1086       it's not required to return anything.
1087
1088   """
1089   clist = []
1090   if disk.children:
1091     for child in disk.children:
1092       try:
1093         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1094       except errors.BlockDeviceError, err:
1095         _Fail("Can't assemble device %s: %s", child, err)
1096       if on_primary or disk.AssembleOnSecondary():
1097         # we need the children open in case the device itself has to
1098         # be assembled
1099         try:
1100           crdev.Open()
1101         except errors.BlockDeviceError, err:
1102           _Fail("Can't make child '%s' read-write: %s", child, err)
1103       clist.append(crdev)
1104
1105   try:
1106     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1107   except errors.BlockDeviceError, err:
1108     _Fail("Can't create block device: %s", err)
1109
1110   if on_primary or disk.AssembleOnSecondary():
1111     try:
1112       device.Assemble()
1113     except errors.BlockDeviceError, err:
1114       _Fail("Can't assemble device after creation, unusual event: %s", err)
1115     device.SetSyncSpeed(constants.SYNC_SPEED)
1116     if on_primary or disk.OpenOnSecondary():
1117       try:
1118         device.Open(force=True)
1119       except errors.BlockDeviceError, err:
1120         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1121     DevCacheManager.UpdateCache(device.dev_path, owner,
1122                                 on_primary, disk.iv_name)
1123
1124   device.SetInfo(info)
1125
1126   return device.unique_id
1127
1128
1129 def BlockdevRemove(disk):
1130   """Remove a block device.
1131
1132   @note: This is intended to be called recursively.
1133
1134   @type disk: L{objects.Disk}
1135   @param disk: the disk object we should remove
1136   @rtype: boolean
1137   @return: the success of the operation
1138
1139   """
1140   msgs = []
1141   try:
1142     rdev = _RecursiveFindBD(disk)
1143   except errors.BlockDeviceError, err:
1144     # probably can't attach
1145     logging.info("Can't attach to device %s in remove", disk)
1146     rdev = None
1147   if rdev is not None:
1148     r_path = rdev.dev_path
1149     try:
1150       rdev.Remove()
1151     except errors.BlockDeviceError, err:
1152       msgs.append(str(err))
1153     if not msgs:
1154       DevCacheManager.RemoveCache(r_path)
1155
1156   if disk.children:
1157     for child in disk.children:
1158       try:
1159         BlockdevRemove(child)
1160       except RPCFail, err:
1161         msgs.append(str(err))
1162
1163   if msgs:
1164     _Fail("; ".join(msgs))
1165
1166
1167 def _RecursiveAssembleBD(disk, owner, as_primary):
1168   """Activate a block device for an instance.
1169
1170   This is run on the primary and secondary nodes for an instance.
1171
1172   @note: this function is called recursively.
1173
1174   @type disk: L{objects.Disk}
1175   @param disk: the disk we try to assemble
1176   @type owner: str
1177   @param owner: the name of the instance which owns the disk
1178   @type as_primary: boolean
1179   @param as_primary: if we should make the block device
1180       read/write
1181
1182   @return: the assembled device or None (in case no device
1183       was assembled)
1184   @raise errors.BlockDeviceError: in case there is an error
1185       during the activation of the children or the device
1186       itself
1187
1188   """
1189   children = []
1190   if disk.children:
1191     mcn = disk.ChildrenNeeded()
1192     if mcn == -1:
1193       mcn = 0 # max number of Nones allowed
1194     else:
1195       mcn = len(disk.children) - mcn # max number of Nones
1196     for chld_disk in disk.children:
1197       try:
1198         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1199       except errors.BlockDeviceError, err:
1200         if children.count(None) >= mcn:
1201           raise
1202         cdev = None
1203         logging.error("Error in child activation (but continuing): %s",
1204                       str(err))
1205       children.append(cdev)
1206
1207   if as_primary or disk.AssembleOnSecondary():
1208     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1209     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1210     result = r_dev
1211     if as_primary or disk.OpenOnSecondary():
1212       r_dev.Open()
1213     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1214                                 as_primary, disk.iv_name)
1215
1216   else:
1217     result = True
1218   return result
1219
1220
1221 def BlockdevAssemble(disk, owner, as_primary):
1222   """Activate a block device for an instance.
1223
1224   This is a wrapper over _RecursiveAssembleBD.
1225
1226   @rtype: str or boolean
1227   @return: a C{/dev/...} path for primary nodes, and
1228       C{True} for secondary nodes
1229
1230   """
1231   try:
1232     result = _RecursiveAssembleBD(disk, owner, as_primary)
1233     if isinstance(result, bdev.BlockDev):
1234       result = result.dev_path
1235   except errors.BlockDeviceError, err:
1236     _Fail("Error while assembling disk: %s", err, exc=True)
1237
1238   return result
1239
1240
1241 def BlockdevShutdown(disk):
1242   """Shut down a block device.
1243
1244   First, if the device is assembled (Attach() is successfull), then
1245   the device is shutdown. Then the children of the device are
1246   shutdown.
1247
1248   This function is called recursively. Note that we don't cache the
1249   children or such, as oppossed to assemble, shutdown of different
1250   devices doesn't require that the upper device was active.
1251
1252   @type disk: L{objects.Disk}
1253   @param disk: the description of the disk we should
1254       shutdown
1255   @rtype: None
1256
1257   """
1258   msgs = []
1259   r_dev = _RecursiveFindBD(disk)
1260   if r_dev is not None:
1261     r_path = r_dev.dev_path
1262     try:
1263       r_dev.Shutdown()
1264       DevCacheManager.RemoveCache(r_path)
1265     except errors.BlockDeviceError, err:
1266       msgs.append(str(err))
1267
1268   if disk.children:
1269     for child in disk.children:
1270       try:
1271         BlockdevShutdown(child)
1272       except RPCFail, err:
1273         msgs.append(str(err))
1274
1275   if msgs:
1276     _Fail("; ".join(msgs))
1277
1278
1279 def BlockdevAddchildren(parent_cdev, new_cdevs):
1280   """Extend a mirrored block device.
1281
1282   @type parent_cdev: L{objects.Disk}
1283   @param parent_cdev: the disk to which we should add children
1284   @type new_cdevs: list of L{objects.Disk}
1285   @param new_cdevs: the list of children which we should add
1286   @rtype: None
1287
1288   """
1289   parent_bdev = _RecursiveFindBD(parent_cdev)
1290   if parent_bdev is None:
1291     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1292   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1293   if new_bdevs.count(None) > 0:
1294     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1295   parent_bdev.AddChildren(new_bdevs)
1296
1297
1298 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1299   """Shrink a mirrored block device.
1300
1301   @type parent_cdev: L{objects.Disk}
1302   @param parent_cdev: the disk from which we should remove children
1303   @type new_cdevs: list of L{objects.Disk}
1304   @param new_cdevs: the list of children which we should remove
1305   @rtype: None
1306
1307   """
1308   parent_bdev = _RecursiveFindBD(parent_cdev)
1309   if parent_bdev is None:
1310     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1311   devs = []
1312   for disk in new_cdevs:
1313     rpath = disk.StaticDevPath()
1314     if rpath is None:
1315       bd = _RecursiveFindBD(disk)
1316       if bd is None:
1317         _Fail("Can't find device %s while removing children", disk)
1318       else:
1319         devs.append(bd.dev_path)
1320     else:
1321       devs.append(rpath)
1322   parent_bdev.RemoveChildren(devs)
1323
1324
1325 def BlockdevGetmirrorstatus(disks):
1326   """Get the mirroring status of a list of devices.
1327
1328   @type disks: list of L{objects.Disk}
1329   @param disks: the list of disks which we should query
1330   @rtype: disk
1331   @return:
1332       a list of (mirror_done, estimated_time) tuples, which
1333       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1334   @raise errors.BlockDeviceError: if any of the disks cannot be
1335       found
1336
1337   """
1338   stats = []
1339   for dsk in disks:
1340     rbd = _RecursiveFindBD(dsk)
1341     if rbd is None:
1342       _Fail("Can't find device %s", dsk)
1343     stats.append(rbd.CombinedSyncStatus())
1344   return stats
1345
1346
1347 def _RecursiveFindBD(disk):
1348   """Check if a device is activated.
1349
1350   If so, return informations about the real device.
1351
1352   @type disk: L{objects.Disk}
1353   @param disk: the disk object we need to find
1354
1355   @return: None if the device can't be found,
1356       otherwise the device instance
1357
1358   """
1359   children = []
1360   if disk.children:
1361     for chdisk in disk.children:
1362       children.append(_RecursiveFindBD(chdisk))
1363
1364   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1365
1366
1367 def BlockdevFind(disk):
1368   """Check if a device is activated.
1369
1370   If it is, return informations about the real device.
1371
1372   @type disk: L{objects.Disk}
1373   @param disk: the disk to find
1374   @rtype: None or tuple
1375   @return: None if the disk cannot be found, otherwise a
1376       tuple (device_path, major, minor, sync_percent,
1377       estimated_time, is_degraded)
1378
1379   """
1380   try:
1381     rbd = _RecursiveFindBD(disk)
1382   except errors.BlockDeviceError, err:
1383     _Fail("Failed to find device: %s", err, exc=True)
1384   if rbd is None:
1385     return None
1386   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1387
1388
1389 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1390   """Write a file to the filesystem.
1391
1392   This allows the master to overwrite(!) a file. It will only perform
1393   the operation if the file belongs to a list of configuration files.
1394
1395   @type file_name: str
1396   @param file_name: the target file name
1397   @type data: str
1398   @param data: the new contents of the file
1399   @type mode: int
1400   @param mode: the mode to give the file (can be None)
1401   @type uid: int
1402   @param uid: the owner of the file (can be -1 for default)
1403   @type gid: int
1404   @param gid: the group of the file (can be -1 for default)
1405   @type atime: float
1406   @param atime: the atime to set on the file (can be None)
1407   @type mtime: float
1408   @param mtime: the mtime to set on the file (can be None)
1409   @rtype: None
1410
1411   """
1412   if not os.path.isabs(file_name):
1413     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1414
1415   allowed_files = set([
1416     constants.CLUSTER_CONF_FILE,
1417     constants.ETC_HOSTS,
1418     constants.SSH_KNOWN_HOSTS_FILE,
1419     constants.VNC_PASSWORD_FILE,
1420     constants.RAPI_CERT_FILE,
1421     constants.RAPI_USERS_FILE,
1422     ])
1423
1424   for hv_name in constants.HYPER_TYPES:
1425     hv_class = hypervisor.GetHypervisor(hv_name)
1426     allowed_files.update(hv_class.GetAncillaryFiles())
1427
1428   if file_name not in allowed_files:
1429     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1430           file_name)
1431
1432   raw_data = _Decompress(data)
1433
1434   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1435                   atime=atime, mtime=mtime)
1436
1437
1438 def WriteSsconfFiles(values):
1439   """Update all ssconf files.
1440
1441   Wrapper around the SimpleStore.WriteFiles.
1442
1443   """
1444   ssconf.SimpleStore().WriteFiles(values)
1445
1446
1447 def _ErrnoOrStr(err):
1448   """Format an EnvironmentError exception.
1449
1450   If the L{err} argument has an errno attribute, it will be looked up
1451   and converted into a textual C{E...} description. Otherwise the
1452   string representation of the error will be returned.
1453
1454   @type err: L{EnvironmentError}
1455   @param err: the exception to format
1456
1457   """
1458   if hasattr(err, 'errno'):
1459     detail = errno.errorcode[err.errno]
1460   else:
1461     detail = str(err)
1462   return detail
1463
1464
1465 def _OSOndiskVersion(name, os_dir):
1466   """Compute and return the API version of a given OS.
1467
1468   This function will try to read the API version of the OS given by
1469   the 'name' parameter and residing in the 'os_dir' directory.
1470
1471   @type name: str
1472   @param name: the OS name we should look for
1473   @type os_dir: str
1474   @param os_dir: the directory inwhich we should look for the OS
1475   @rtype: tuple
1476   @return: tuple (status, data) with status denoting the validity and
1477       data holding either the vaid versions or an error message
1478
1479   """
1480   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1481
1482   try:
1483     st = os.stat(api_file)
1484   except EnvironmentError, err:
1485     return False, ("Required file 'ganeti_api_version' file not"
1486                    " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1487
1488   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1489     return False, ("File 'ganeti_api_version' file at %s is not"
1490                    " a regular file" % os_dir)
1491
1492   try:
1493     f = open(api_file)
1494     try:
1495       api_versions = f.readlines()
1496     finally:
1497       f.close()
1498   except EnvironmentError, err:
1499     return False, ("Error while reading the API version file at %s: %s" %
1500                    (api_file, _ErrnoOrStr(err)))
1501
1502   api_versions = [version.strip() for version in api_versions]
1503   try:
1504     api_versions = [int(version) for version in api_versions]
1505   except (TypeError, ValueError), err:
1506     return False, ("API version(s) can't be converted to integer: %s" %
1507                    str(err))
1508
1509   return True, api_versions
1510
1511
1512 def DiagnoseOS(top_dirs=None):
1513   """Compute the validity for all OSes.
1514
1515   @type top_dirs: list
1516   @param top_dirs: the list of directories in which to
1517       search (if not given defaults to
1518       L{constants.OS_SEARCH_PATH})
1519   @rtype: list of L{objects.OS}
1520   @return: a list of tuples (name, path, status, diagnose)
1521       for all (potential) OSes under all search paths, where:
1522           - name is the (potential) OS name
1523           - path is the full path to the OS
1524           - status True/False is the validity of the OS
1525           - diagnose is the error message for an invalid OS, otherwise empty
1526
1527   """
1528   if top_dirs is None:
1529     top_dirs = constants.OS_SEARCH_PATH
1530
1531   result = []
1532   for dir_name in top_dirs:
1533     if os.path.isdir(dir_name):
1534       try:
1535         f_names = utils.ListVisibleFiles(dir_name)
1536       except EnvironmentError, err:
1537         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1538         break
1539       for name in f_names:
1540         os_path = os.path.sep.join([dir_name, name])
1541         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1542         if status:
1543           diagnose = ""
1544         else:
1545           diagnose = os_inst
1546         result.append((name, os_path, status, diagnose))
1547
1548   return result
1549
1550
1551 def _TryOSFromDisk(name, base_dir=None):
1552   """Create an OS instance from disk.
1553
1554   This function will return an OS instance if the given name is a
1555   valid OS name.
1556
1557   @type base_dir: string
1558   @keyword base_dir: Base directory containing OS installations.
1559                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1560   @rtype: tuple
1561   @return: success and either the OS instance if we find a valid one,
1562       or error message
1563
1564   """
1565   if base_dir is None:
1566     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1567     if os_dir is None:
1568       return False, "Directory for OS %s not found in search path" % name
1569   else:
1570     os_dir = os.path.sep.join([base_dir, name])
1571
1572   status, api_versions = _OSOndiskVersion(name, os_dir)
1573   if not status:
1574     # push the error up
1575     return status, api_versions
1576
1577   if constants.OS_API_VERSION not in api_versions:
1578     return False, ("API version mismatch for path '%s': found %s, want %s." %
1579                    (os_dir, api_versions, constants.OS_API_VERSION))
1580
1581   # OS Scripts dictionary, we will populate it with the actual script names
1582   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1583
1584   for script in os_scripts:
1585     os_scripts[script] = os.path.sep.join([os_dir, script])
1586
1587     try:
1588       st = os.stat(os_scripts[script])
1589     except EnvironmentError, err:
1590       return False, ("Script '%s' under path '%s' is missing (%s)" %
1591                      (script, os_dir, _ErrnoOrStr(err)))
1592
1593     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1594       return False, ("Script '%s' under path '%s' is not executable" %
1595                      (script, os_dir))
1596
1597     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1598       return False, ("Script '%s' under path '%s' is not a regular file" %
1599                      (script, os_dir))
1600
1601   os_obj = objects.OS(name=name, path=os_dir,
1602                       create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1603                       export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1604                       import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1605                       rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1606                       api_versions=api_versions)
1607   return True, os_obj
1608
1609
1610 def OSFromDisk(name, base_dir=None):
1611   """Create an OS instance from disk.
1612
1613   This function will return an OS instance if the given name is a
1614   valid OS name. Otherwise, it will raise an appropriate
1615   L{RPCFail} exception, detailing why this is not a valid OS.
1616
1617   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1618   an exception but returns true/false status data.
1619
1620   @type base_dir: string
1621   @keyword base_dir: Base directory containing OS installations.
1622                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1623   @rtype: L{objects.OS}
1624   @return: the OS instance if we find a valid one
1625   @raise RPCFail: if we don't find a valid OS
1626
1627   """
1628   status, payload = _TryOSFromDisk(name, base_dir)
1629
1630   if not status:
1631     _Fail(payload)
1632
1633   return payload
1634
1635
1636 def OSEnvironment(instance, debug=0):
1637   """Calculate the environment for an os script.
1638
1639   @type instance: L{objects.Instance}
1640   @param instance: target instance for the os script run
1641   @type debug: integer
1642   @param debug: debug level (0 or 1, for OS Api 10)
1643   @rtype: dict
1644   @return: dict of environment variables
1645   @raise errors.BlockDeviceError: if the block device
1646       cannot be found
1647
1648   """
1649   result = {}
1650   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1651   result['INSTANCE_NAME'] = instance.name
1652   result['INSTANCE_OS'] = instance.os
1653   result['HYPERVISOR'] = instance.hypervisor
1654   result['DISK_COUNT'] = '%d' % len(instance.disks)
1655   result['NIC_COUNT'] = '%d' % len(instance.nics)
1656   result['DEBUG_LEVEL'] = '%d' % debug
1657   for idx, disk in enumerate(instance.disks):
1658     real_disk = _RecursiveFindBD(disk)
1659     if real_disk is None:
1660       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1661                                     str(disk))
1662     real_disk.Open()
1663     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1664     result['DISK_%d_ACCESS' % idx] = disk.mode
1665     if constants.HV_DISK_TYPE in instance.hvparams:
1666       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1667         instance.hvparams[constants.HV_DISK_TYPE]
1668     if disk.dev_type in constants.LDS_BLOCK:
1669       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1670     elif disk.dev_type == constants.LD_FILE:
1671       result['DISK_%d_BACKEND_TYPE' % idx] = \
1672         'file:%s' % disk.physical_id[0]
1673   for idx, nic in enumerate(instance.nics):
1674     result['NIC_%d_MAC' % idx] = nic.mac
1675     if nic.ip:
1676       result['NIC_%d_IP' % idx] = nic.ip
1677     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1678     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1679       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1680     if nic.nicparams[constants.NIC_LINK]:
1681       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1682     if constants.HV_NIC_TYPE in instance.hvparams:
1683       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1684         instance.hvparams[constants.HV_NIC_TYPE]
1685
1686   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1687     for key, value in source.items():
1688       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1689
1690   return result
1691
1692 def BlockdevGrow(disk, amount):
1693   """Grow a stack of block devices.
1694
1695   This function is called recursively, with the childrens being the
1696   first ones to resize.
1697
1698   @type disk: L{objects.Disk}
1699   @param disk: the disk to be grown
1700   @rtype: (status, result)
1701   @return: a tuple with the status of the operation
1702       (True/False), and the errors message if status
1703       is False
1704
1705   """
1706   r_dev = _RecursiveFindBD(disk)
1707   if r_dev is None:
1708     _Fail("Cannot find block device %s", disk)
1709
1710   try:
1711     r_dev.Grow(amount)
1712   except errors.BlockDeviceError, err:
1713     _Fail("Failed to grow block device: %s", err, exc=True)
1714
1715
1716 def BlockdevSnapshot(disk):
1717   """Create a snapshot copy of a block device.
1718
1719   This function is called recursively, and the snapshot is actually created
1720   just for the leaf lvm backend device.
1721
1722   @type disk: L{objects.Disk}
1723   @param disk: the disk to be snapshotted
1724   @rtype: string
1725   @return: snapshot disk path
1726
1727   """
1728   if disk.children:
1729     if len(disk.children) == 1:
1730       # only one child, let's recurse on it
1731       return BlockdevSnapshot(disk.children[0])
1732     else:
1733       # more than one child, choose one that matches
1734       for child in disk.children:
1735         if child.size == disk.size:
1736           # return implies breaking the loop
1737           return BlockdevSnapshot(child)
1738   elif disk.dev_type == constants.LD_LV:
1739     r_dev = _RecursiveFindBD(disk)
1740     if r_dev is not None:
1741       # let's stay on the safe side and ask for the full size, for now
1742       return r_dev.Snapshot(disk.size)
1743     else:
1744       _Fail("Cannot find block device %s", disk)
1745   else:
1746     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1747           disk.unique_id, disk.dev_type)
1748
1749
1750 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1751   """Export a block device snapshot to a remote node.
1752
1753   @type disk: L{objects.Disk}
1754   @param disk: the description of the disk to export
1755   @type dest_node: str
1756   @param dest_node: the destination node to export to
1757   @type instance: L{objects.Instance}
1758   @param instance: the instance object to whom the disk belongs
1759   @type cluster_name: str
1760   @param cluster_name: the cluster name, needed for SSH hostalias
1761   @type idx: int
1762   @param idx: the index of the disk in the instance's disk list,
1763       used to export to the OS scripts environment
1764   @rtype: None
1765
1766   """
1767   export_env = OSEnvironment(instance)
1768
1769   inst_os = OSFromDisk(instance.os)
1770   export_script = inst_os.export_script
1771
1772   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1773                                      instance.name, int(time.time()))
1774   if not os.path.exists(constants.LOG_OS_DIR):
1775     os.mkdir(constants.LOG_OS_DIR, 0750)
1776   real_disk = _RecursiveFindBD(disk)
1777   if real_disk is None:
1778     _Fail("Block device '%s' is not set up", disk)
1779
1780   real_disk.Open()
1781
1782   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1783   export_env['EXPORT_INDEX'] = str(idx)
1784
1785   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1786   destfile = disk.physical_id[1]
1787
1788   # the target command is built out of three individual commands,
1789   # which are joined by pipes; we check each individual command for
1790   # valid parameters
1791   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1792                                export_script, logfile)
1793
1794   comprcmd = "gzip"
1795
1796   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1797                                 destdir, destdir, destfile)
1798   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1799                                                    constants.GANETI_RUNAS,
1800                                                    destcmd)
1801
1802   # all commands have been checked, so we're safe to combine them
1803   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1804
1805   result = utils.RunCmd(command, env=export_env)
1806
1807   if result.failed:
1808     _Fail("OS snapshot export command '%s' returned error: %s"
1809           " output: %s", command, result.fail_reason, result.output)
1810
1811
1812 def FinalizeExport(instance, snap_disks):
1813   """Write out the export configuration information.
1814
1815   @type instance: L{objects.Instance}
1816   @param instance: the instance which we export, used for
1817       saving configuration
1818   @type snap_disks: list of L{objects.Disk}
1819   @param snap_disks: list of snapshot block devices, which
1820       will be used to get the actual name of the dump file
1821
1822   @rtype: None
1823
1824   """
1825   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1826   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1827
1828   config = objects.SerializableConfigParser()
1829
1830   config.add_section(constants.INISECT_EXP)
1831   config.set(constants.INISECT_EXP, 'version', '0')
1832   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1833   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1834   config.set(constants.INISECT_EXP, 'os', instance.os)
1835   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1836
1837   config.add_section(constants.INISECT_INS)
1838   config.set(constants.INISECT_INS, 'name', instance.name)
1839   config.set(constants.INISECT_INS, 'memory', '%d' %
1840              instance.beparams[constants.BE_MEMORY])
1841   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1842              instance.beparams[constants.BE_VCPUS])
1843   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1844
1845   nic_total = 0
1846   for nic_count, nic in enumerate(instance.nics):
1847     nic_total += 1
1848     config.set(constants.INISECT_INS, 'nic%d_mac' %
1849                nic_count, '%s' % nic.mac)
1850     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1851     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1852                '%s' % nic.bridge)
1853   # TODO: redundant: on load can read nics until it doesn't exist
1854   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1855
1856   disk_total = 0
1857   for disk_count, disk in enumerate(snap_disks):
1858     if disk:
1859       disk_total += 1
1860       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1861                  ('%s' % disk.iv_name))
1862       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1863                  ('%s' % disk.physical_id[1]))
1864       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1865                  ('%d' % disk.size))
1866
1867   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1868
1869   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1870                   data=config.Dumps())
1871   shutil.rmtree(finaldestdir, True)
1872   shutil.move(destdir, finaldestdir)
1873
1874
1875 def ExportInfo(dest):
1876   """Get export configuration information.
1877
1878   @type dest: str
1879   @param dest: directory containing the export
1880
1881   @rtype: L{objects.SerializableConfigParser}
1882   @return: a serializable config file containing the
1883       export info
1884
1885   """
1886   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1887
1888   config = objects.SerializableConfigParser()
1889   config.read(cff)
1890
1891   if (not config.has_section(constants.INISECT_EXP) or
1892       not config.has_section(constants.INISECT_INS)):
1893     _Fail("Export info file doesn't have the required fields")
1894
1895   return config.Dumps()
1896
1897
1898 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1899   """Import an os image into an instance.
1900
1901   @type instance: L{objects.Instance}
1902   @param instance: instance to import the disks into
1903   @type src_node: string
1904   @param src_node: source node for the disk images
1905   @type src_images: list of string
1906   @param src_images: absolute paths of the disk images
1907   @rtype: list of boolean
1908   @return: each boolean represent the success of importing the n-th disk
1909
1910   """
1911   import_env = OSEnvironment(instance)
1912   inst_os = OSFromDisk(instance.os)
1913   import_script = inst_os.import_script
1914
1915   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1916                                         instance.name, int(time.time()))
1917   if not os.path.exists(constants.LOG_OS_DIR):
1918     os.mkdir(constants.LOG_OS_DIR, 0750)
1919
1920   comprcmd = "gunzip"
1921   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1922                                import_script, logfile)
1923
1924   final_result = []
1925   for idx, image in enumerate(src_images):
1926     if image:
1927       destcmd = utils.BuildShellCmd('cat %s', image)
1928       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1929                                                        constants.GANETI_RUNAS,
1930                                                        destcmd)
1931       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1932       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1933       import_env['IMPORT_INDEX'] = str(idx)
1934       result = utils.RunCmd(command, env=import_env)
1935       if result.failed:
1936         logging.error("Disk import command '%s' returned error: %s"
1937                       " output: %s", command, result.fail_reason,
1938                       result.output)
1939         final_result.append("error importing disk %d: %s, %s" %
1940                             (idx, result.fail_reason, result.output[-100]))
1941
1942   if final_result:
1943     _Fail("; ".join(final_result), log=False)
1944
1945
1946 def ListExports():
1947   """Return a list of exports currently available on this machine.
1948
1949   @rtype: list
1950   @return: list of the exports
1951
1952   """
1953   if os.path.isdir(constants.EXPORT_DIR):
1954     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1955   else:
1956     _Fail("No exports directory")
1957
1958
1959 def RemoveExport(export):
1960   """Remove an existing export from the node.
1961
1962   @type export: str
1963   @param export: the name of the export to remove
1964   @rtype: None
1965
1966   """
1967   target = os.path.join(constants.EXPORT_DIR, export)
1968
1969   try:
1970     shutil.rmtree(target)
1971   except EnvironmentError, err:
1972     _Fail("Error while removing the export: %s", err, exc=True)
1973
1974
1975 def BlockdevRename(devlist):
1976   """Rename a list of block devices.
1977
1978   @type devlist: list of tuples
1979   @param devlist: list of tuples of the form  (disk,
1980       new_logical_id, new_physical_id); disk is an
1981       L{objects.Disk} object describing the current disk,
1982       and new logical_id/physical_id is the name we
1983       rename it to
1984   @rtype: boolean
1985   @return: True if all renames succeeded, False otherwise
1986
1987   """
1988   msgs = []
1989   result = True
1990   for disk, unique_id in devlist:
1991     dev = _RecursiveFindBD(disk)
1992     if dev is None:
1993       msgs.append("Can't find device %s in rename" % str(disk))
1994       result = False
1995       continue
1996     try:
1997       old_rpath = dev.dev_path
1998       dev.Rename(unique_id)
1999       new_rpath = dev.dev_path
2000       if old_rpath != new_rpath:
2001         DevCacheManager.RemoveCache(old_rpath)
2002         # FIXME: we should add the new cache information here, like:
2003         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2004         # but we don't have the owner here - maybe parse from existing
2005         # cache? for now, we only lose lvm data when we rename, which
2006         # is less critical than DRBD or MD
2007     except errors.BlockDeviceError, err:
2008       msgs.append("Can't rename device '%s' to '%s': %s" %
2009                   (dev, unique_id, err))
2010       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2011       result = False
2012   if not result:
2013     _Fail("; ".join(msgs))
2014
2015
2016 def _TransformFileStorageDir(file_storage_dir):
2017   """Checks whether given file_storage_dir is valid.
2018
2019   Checks wheter the given file_storage_dir is within the cluster-wide
2020   default file_storage_dir stored in SimpleStore. Only paths under that
2021   directory are allowed.
2022
2023   @type file_storage_dir: str
2024   @param file_storage_dir: the path to check
2025
2026   @return: the normalized path if valid, None otherwise
2027
2028   """
2029   cfg = _GetConfig()
2030   file_storage_dir = os.path.normpath(file_storage_dir)
2031   base_file_storage_dir = cfg.GetFileStorageDir()
2032   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2033       base_file_storage_dir):
2034     _Fail("File storage directory '%s' is not under base file"
2035           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2036   return file_storage_dir
2037
2038
2039 def CreateFileStorageDir(file_storage_dir):
2040   """Create file storage directory.
2041
2042   @type file_storage_dir: str
2043   @param file_storage_dir: directory to create
2044
2045   @rtype: tuple
2046   @return: tuple with first element a boolean indicating wheter dir
2047       creation was successful or not
2048
2049   """
2050   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2051   if os.path.exists(file_storage_dir):
2052     if not os.path.isdir(file_storage_dir):
2053       _Fail("Specified storage dir '%s' is not a directory",
2054             file_storage_dir)
2055   else:
2056     try:
2057       os.makedirs(file_storage_dir, 0750)
2058     except OSError, err:
2059       _Fail("Cannot create file storage directory '%s': %s",
2060             file_storage_dir, err, exc=True)
2061
2062
2063 def RemoveFileStorageDir(file_storage_dir):
2064   """Remove file storage directory.
2065
2066   Remove it only if it's empty. If not log an error and return.
2067
2068   @type file_storage_dir: str
2069   @param file_storage_dir: the directory we should cleanup
2070   @rtype: tuple (success,)
2071   @return: tuple of one element, C{success}, denoting
2072       whether the operation was successfull
2073
2074   """
2075   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2076   if os.path.exists(file_storage_dir):
2077     if not os.path.isdir(file_storage_dir):
2078       _Fail("Specified Storage directory '%s' is not a directory",
2079             file_storage_dir)
2080     # deletes dir only if empty, otherwise we want to fail the rpc call
2081     try:
2082       os.rmdir(file_storage_dir)
2083     except OSError, err:
2084       _Fail("Cannot remove file storage directory '%s': %s",
2085             file_storage_dir, err)
2086
2087
2088 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2089   """Rename the file storage directory.
2090
2091   @type old_file_storage_dir: str
2092   @param old_file_storage_dir: the current path
2093   @type new_file_storage_dir: str
2094   @param new_file_storage_dir: the name we should rename to
2095   @rtype: tuple (success,)
2096   @return: tuple of one element, C{success}, denoting
2097       whether the operation was successful
2098
2099   """
2100   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2101   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2102   if not os.path.exists(new_file_storage_dir):
2103     if os.path.isdir(old_file_storage_dir):
2104       try:
2105         os.rename(old_file_storage_dir, new_file_storage_dir)
2106       except OSError, err:
2107         _Fail("Cannot rename '%s' to '%s': %s",
2108               old_file_storage_dir, new_file_storage_dir, err)
2109     else:
2110       _Fail("Specified storage dir '%s' is not a directory",
2111             old_file_storage_dir)
2112   else:
2113     if os.path.exists(old_file_storage_dir):
2114       _Fail("Cannot rename '%s' to '%s': both locations exist",
2115             old_file_storage_dir, new_file_storage_dir)
2116
2117
2118 def _EnsureJobQueueFile(file_name):
2119   """Checks whether the given filename is in the queue directory.
2120
2121   @type file_name: str
2122   @param file_name: the file name we should check
2123   @rtype: None
2124   @raises RPCFail: if the file is not valid
2125
2126   """
2127   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2128   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2129
2130   if not result:
2131     _Fail("Passed job queue file '%s' does not belong to"
2132           " the queue directory '%s'", file_name, queue_dir)
2133
2134
2135 def JobQueueUpdate(file_name, content):
2136   """Updates a file in the queue directory.
2137
2138   This is just a wrapper over L{utils.WriteFile}, with proper
2139   checking.
2140
2141   @type file_name: str
2142   @param file_name: the job file name
2143   @type content: str
2144   @param content: the new job contents
2145   @rtype: boolean
2146   @return: the success of the operation
2147
2148   """
2149   _EnsureJobQueueFile(file_name)
2150
2151   # Write and replace the file atomically
2152   utils.WriteFile(file_name, data=_Decompress(content))
2153
2154
2155 def JobQueueRename(old, new):
2156   """Renames a job queue file.
2157
2158   This is just a wrapper over os.rename with proper checking.
2159
2160   @type old: str
2161   @param old: the old (actual) file name
2162   @type new: str
2163   @param new: the desired file name
2164   @rtype: tuple
2165   @return: the success of the operation and payload
2166
2167   """
2168   _EnsureJobQueueFile(old)
2169   _EnsureJobQueueFile(new)
2170
2171   utils.RenameFile(old, new, mkdir=True)
2172
2173
2174 def JobQueueSetDrainFlag(drain_flag):
2175   """Set the drain flag for the queue.
2176
2177   This will set or unset the queue drain flag.
2178
2179   @type drain_flag: boolean
2180   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2181   @rtype: truple
2182   @return: always True, None
2183   @warning: the function always returns True
2184
2185   """
2186   if drain_flag:
2187     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2188   else:
2189     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2190
2191
2192 def BlockdevClose(instance_name, disks):
2193   """Closes the given block devices.
2194
2195   This means they will be switched to secondary mode (in case of
2196   DRBD).
2197
2198   @param instance_name: if the argument is not empty, the symlinks
2199       of this instance will be removed
2200   @type disks: list of L{objects.Disk}
2201   @param disks: the list of disks to be closed
2202   @rtype: tuple (success, message)
2203   @return: a tuple of success and message, where success
2204       indicates the succes of the operation, and message
2205       which will contain the error details in case we
2206       failed
2207
2208   """
2209   bdevs = []
2210   for cf in disks:
2211     rd = _RecursiveFindBD(cf)
2212     if rd is None:
2213       _Fail("Can't find device %s", cf)
2214     bdevs.append(rd)
2215
2216   msg = []
2217   for rd in bdevs:
2218     try:
2219       rd.Close()
2220     except errors.BlockDeviceError, err:
2221       msg.append(str(err))
2222   if msg:
2223     _Fail("Can't make devices secondary: %s", ",".join(msg))
2224   else:
2225     if instance_name:
2226       _RemoveBlockDevLinks(instance_name, disks)
2227
2228
2229 def ValidateHVParams(hvname, hvparams):
2230   """Validates the given hypervisor parameters.
2231
2232   @type hvname: string
2233   @param hvname: the hypervisor name
2234   @type hvparams: dict
2235   @param hvparams: the hypervisor parameters to be validated
2236   @rtype: None
2237
2238   """
2239   try:
2240     hv_type = hypervisor.GetHypervisor(hvname)
2241     hv_type.ValidateParameters(hvparams)
2242   except errors.HypervisorError, err:
2243     _Fail(str(err), log=False)
2244
2245
2246 def DemoteFromMC():
2247   """Demotes the current node from master candidate role.
2248
2249   """
2250   # try to ensure we're not the master by mistake
2251   master, myself = ssconf.GetMasterAndMyself()
2252   if master == myself:
2253     _Fail("ssconf status shows I'm the master node, will not demote")
2254   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2255   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2256     _Fail("The master daemon is running, will not demote")
2257   try:
2258     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2259   except EnvironmentError, err:
2260     if err.errno != errno.ENOENT:
2261       _Fail("Error while backing up cluster file: %s", err, exc=True)
2262   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2263
2264
2265 def _FindDisks(nodes_ip, disks):
2266   """Sets the physical ID on disks and returns the block devices.
2267
2268   """
2269   # set the correct physical ID
2270   my_name = utils.HostInfo().name
2271   for cf in disks:
2272     cf.SetPhysicalID(my_name, nodes_ip)
2273
2274   bdevs = []
2275
2276   for cf in disks:
2277     rd = _RecursiveFindBD(cf)
2278     if rd is None:
2279       _Fail("Can't find device %s", cf)
2280     bdevs.append(rd)
2281   return bdevs
2282
2283
2284 def DrbdDisconnectNet(nodes_ip, disks):
2285   """Disconnects the network on a list of drbd devices.
2286
2287   """
2288   bdevs = _FindDisks(nodes_ip, disks)
2289
2290   # disconnect disks
2291   for rd in bdevs:
2292     try:
2293       rd.DisconnectNet()
2294     except errors.BlockDeviceError, err:
2295       _Fail("Can't change network configuration to standalone mode: %s",
2296             err, exc=True)
2297
2298
2299 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2300   """Attaches the network on a list of drbd devices.
2301
2302   """
2303   bdevs = _FindDisks(nodes_ip, disks)
2304
2305   if multimaster:
2306     for idx, rd in enumerate(bdevs):
2307       try:
2308         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2309       except EnvironmentError, err:
2310         _Fail("Can't create symlink: %s", err)
2311   # reconnect disks, switch to new master configuration and if
2312   # needed primary mode
2313   for rd in bdevs:
2314     try:
2315       rd.AttachNet(multimaster)
2316     except errors.BlockDeviceError, err:
2317       _Fail("Can't change network configuration: %s", err)
2318   # wait until the disks are connected; we need to retry the re-attach
2319   # if the device becomes standalone, as this might happen if the one
2320   # node disconnects and reconnects in a different mode before the
2321   # other node reconnects; in this case, one or both of the nodes will
2322   # decide it has wrong configuration and switch to standalone
2323   RECONNECT_TIMEOUT = 2 * 60
2324   sleep_time = 0.100 # start with 100 miliseconds
2325   timeout_limit = time.time() + RECONNECT_TIMEOUT
2326   while time.time() < timeout_limit:
2327     all_connected = True
2328     for rd in bdevs:
2329       stats = rd.GetProcStatus()
2330       if not (stats.is_connected or stats.is_in_resync):
2331         all_connected = False
2332       if stats.is_standalone:
2333         # peer had different config info and this node became
2334         # standalone, even though this should not happen with the
2335         # new staged way of changing disk configs
2336         try:
2337           rd.AttachNet(multimaster)
2338         except errors.BlockDeviceError, err:
2339           _Fail("Can't change network configuration: %s", err)
2340     if all_connected:
2341       break
2342     time.sleep(sleep_time)
2343     sleep_time = min(5, sleep_time * 1.5)
2344   if not all_connected:
2345     _Fail("Timeout in disk reconnecting")
2346   if multimaster:
2347     # change to primary mode
2348     for rd in bdevs:
2349       try:
2350         rd.Open()
2351       except errors.BlockDeviceError, err:
2352         _Fail("Can't change to primary mode: %s", err)
2353
2354
2355 def DrbdWaitSync(nodes_ip, disks):
2356   """Wait until DRBDs have synchronized.
2357
2358   """
2359   bdevs = _FindDisks(nodes_ip, disks)
2360
2361   min_resync = 100
2362   alldone = True
2363   for rd in bdevs:
2364     stats = rd.GetProcStatus()
2365     if not (stats.is_connected or stats.is_in_resync):
2366       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2367     alldone = alldone and (not stats.is_in_resync)
2368     if stats.sync_percent is not None:
2369       min_resync = min(min_resync, stats.sync_percent)
2370
2371   return (alldone, min_resync)
2372
2373
2374 def PowercycleNode(hypervisor_type):
2375   """Hard-powercycle the node.
2376
2377   Because we need to return first, and schedule the powercycle in the
2378   background, we won't be able to report failures nicely.
2379
2380   """
2381   hyper = hypervisor.GetHypervisor(hypervisor_type)
2382   try:
2383     pid = os.fork()
2384   except OSError:
2385     # if we can't fork, we'll pretend that we're in the child process
2386     pid = 0
2387   if pid > 0:
2388     return "Reboot scheduled in 5 seconds"
2389   time.sleep(5)
2390   hyper.PowercycleNode()
2391
2392
2393 class HooksRunner(object):
2394   """Hook runner.
2395
2396   This class is instantiated on the node side (ganeti-noded) and not
2397   on the master side.
2398
2399   """
2400   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2401
2402   def __init__(self, hooks_base_dir=None):
2403     """Constructor for hooks runner.
2404
2405     @type hooks_base_dir: str or None
2406     @param hooks_base_dir: if not None, this overrides the
2407         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2408
2409     """
2410     if hooks_base_dir is None:
2411       hooks_base_dir = constants.HOOKS_BASE_DIR
2412     self._BASE_DIR = hooks_base_dir
2413
2414   @staticmethod
2415   def ExecHook(script, env):
2416     """Exec one hook script.
2417
2418     @type script: str
2419     @param script: the full path to the script
2420     @type env: dict
2421     @param env: the environment with which to exec the script
2422     @rtype: tuple (success, message)
2423     @return: a tuple of success and message, where success
2424         indicates the succes of the operation, and message
2425         which will contain the error details in case we
2426         failed
2427
2428     """
2429     # exec the process using subprocess and log the output
2430     fdstdin = None
2431     try:
2432       fdstdin = open("/dev/null", "r")
2433       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2434                                stderr=subprocess.STDOUT, close_fds=True,
2435                                shell=False, cwd="/", env=env)
2436       output = ""
2437       try:
2438         output = child.stdout.read(4096)
2439         child.stdout.close()
2440       except EnvironmentError, err:
2441         output += "Hook script error: %s" % str(err)
2442
2443       while True:
2444         try:
2445           result = child.wait()
2446           break
2447         except EnvironmentError, err:
2448           if err.errno == errno.EINTR:
2449             continue
2450           raise
2451     finally:
2452       # try not to leak fds
2453       for fd in (fdstdin, ):
2454         if fd is not None:
2455           try:
2456             fd.close()
2457           except EnvironmentError, err:
2458             # just log the error
2459             #logging.exception("Error while closing fd %s", fd)
2460             pass
2461
2462     return result == 0, utils.SafeEncode(output.strip())
2463
2464   def RunHooks(self, hpath, phase, env):
2465     """Run the scripts in the hooks directory.
2466
2467     @type hpath: str
2468     @param hpath: the path to the hooks directory which
2469         holds the scripts
2470     @type phase: str
2471     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2472         L{constants.HOOKS_PHASE_POST}
2473     @type env: dict
2474     @param env: dictionary with the environment for the hook
2475     @rtype: list
2476     @return: list of 3-element tuples:
2477       - script path
2478       - script result, either L{constants.HKR_SUCCESS} or
2479         L{constants.HKR_FAIL}
2480       - output of the script
2481
2482     @raise errors.ProgrammerError: for invalid input
2483         parameters
2484
2485     """
2486     if phase == constants.HOOKS_PHASE_PRE:
2487       suffix = "pre"
2488     elif phase == constants.HOOKS_PHASE_POST:
2489       suffix = "post"
2490     else:
2491       _Fail("Unknown hooks phase '%s'", phase)
2492
2493     rr = []
2494
2495     subdir = "%s-%s.d" % (hpath, suffix)
2496     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2497     try:
2498       dir_contents = utils.ListVisibleFiles(dir_name)
2499     except OSError:
2500       # FIXME: must log output in case of failures
2501       return rr
2502
2503     # we use the standard python sort order,
2504     # so 00name is the recommended naming scheme
2505     dir_contents.sort()
2506     for relname in dir_contents:
2507       fname = os.path.join(dir_name, relname)
2508       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2509           self.RE_MASK.match(relname) is not None):
2510         rrval = constants.HKR_SKIP
2511         output = ""
2512       else:
2513         result, output = self.ExecHook(fname, env)
2514         if not result:
2515           rrval = constants.HKR_FAIL
2516         else:
2517           rrval = constants.HKR_SUCCESS
2518       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2519
2520     return rr
2521
2522
2523 class IAllocatorRunner(object):
2524   """IAllocator runner.
2525
2526   This class is instantiated on the node side (ganeti-noded) and not on
2527   the master side.
2528
2529   """
2530   def Run(self, name, idata):
2531     """Run an iallocator script.
2532
2533     @type name: str
2534     @param name: the iallocator script name
2535     @type idata: str
2536     @param idata: the allocator input data
2537
2538     @rtype: tuple
2539     @return: two element tuple of:
2540        - status
2541        - either error message or stdout of allocator (for success)
2542
2543     """
2544     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2545                                   os.path.isfile)
2546     if alloc_script is None:
2547       _Fail("iallocator module '%s' not found in the search path", name)
2548
2549     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2550     try:
2551       os.write(fd, idata)
2552       os.close(fd)
2553       result = utils.RunCmd([alloc_script, fin_name])
2554       if result.failed:
2555         _Fail("iallocator module '%s' failed: %s, output '%s'",
2556               name, result.fail_reason, result.output)
2557     finally:
2558       os.unlink(fin_name)
2559
2560     return result.stdout
2561
2562
2563 class DevCacheManager(object):
2564   """Simple class for managing a cache of block device information.
2565
2566   """
2567   _DEV_PREFIX = "/dev/"
2568   _ROOT_DIR = constants.BDEV_CACHE_DIR
2569
2570   @classmethod
2571   def _ConvertPath(cls, dev_path):
2572     """Converts a /dev/name path to the cache file name.
2573
2574     This replaces slashes with underscores and strips the /dev
2575     prefix. It then returns the full path to the cache file.
2576
2577     @type dev_path: str
2578     @param dev_path: the C{/dev/} path name
2579     @rtype: str
2580     @return: the converted path name
2581
2582     """
2583     if dev_path.startswith(cls._DEV_PREFIX):
2584       dev_path = dev_path[len(cls._DEV_PREFIX):]
2585     dev_path = dev_path.replace("/", "_")
2586     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2587     return fpath
2588
2589   @classmethod
2590   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2591     """Updates the cache information for a given device.
2592
2593     @type dev_path: str
2594     @param dev_path: the pathname of the device
2595     @type owner: str
2596     @param owner: the owner (instance name) of the device
2597     @type on_primary: bool
2598     @param on_primary: whether this is the primary
2599         node nor not
2600     @type iv_name: str
2601     @param iv_name: the instance-visible name of the
2602         device, as in objects.Disk.iv_name
2603
2604     @rtype: None
2605
2606     """
2607     if dev_path is None:
2608       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2609       return
2610     fpath = cls._ConvertPath(dev_path)
2611     if on_primary:
2612       state = "primary"
2613     else:
2614       state = "secondary"
2615     if iv_name is None:
2616       iv_name = "not_visible"
2617     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2618     try:
2619       utils.WriteFile(fpath, data=fdata)
2620     except EnvironmentError, err:
2621       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2622
2623   @classmethod
2624   def RemoveCache(cls, dev_path):
2625     """Remove data for a dev_path.
2626
2627     This is just a wrapper over L{utils.RemoveFile} with a converted
2628     path name and logging.
2629
2630     @type dev_path: str
2631     @param dev_path: the pathname of the device
2632
2633     @rtype: None
2634
2635     """
2636     if dev_path is None:
2637       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2638       return
2639     fpath = cls._ConvertPath(dev_path)
2640     try:
2641       utils.RemoveFile(fpath)
2642     except EnvironmentError, err:
2643       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)