Daemons conditionally setup console logging
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
59
60
61 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62 _ALLOWED_CLEAN_DIRS = frozenset([
63   constants.DATA_DIR,
64   constants.JOB_QUEUE_ARCHIVE_DIR,
65   constants.QUEUE_DIR,
66   ])
67
68
69 class RPCFail(Exception):
70   """Class denoting RPC failure.
71
72   Its argument is the error message.
73
74   """
75
76
77 def _Fail(msg, *args, **kwargs):
78   """Log an error and the raise an RPCFail exception.
79
80   This exception is then handled specially in the ganeti daemon and
81   turned into a 'failed' return type. As such, this function is a
82   useful shortcut for logging the error and returning it to the master
83   daemon.
84
85   @type msg: string
86   @param msg: the text of the exception
87   @raise RPCFail
88
89   """
90   if args:
91     msg = msg % args
92   if "log" not in kwargs or kwargs["log"]: # if we should log this error
93     if "exc" in kwargs and kwargs["exc"]:
94       logging.exception(msg)
95     else:
96       logging.error(msg)
97   raise RPCFail(msg)
98
99
100 def _GetConfig():
101   """Simple wrapper to return a SimpleStore.
102
103   @rtype: L{ssconf.SimpleStore}
104   @return: a SimpleStore instance
105
106   """
107   return ssconf.SimpleStore()
108
109
110 def _GetSshRunner(cluster_name):
111   """Simple wrapper to return an SshRunner.
112
113   @type cluster_name: str
114   @param cluster_name: the cluster name, which is needed
115       by the SshRunner constructor
116   @rtype: L{ssh.SshRunner}
117   @return: an SshRunner instance
118
119   """
120   return ssh.SshRunner(cluster_name)
121
122
123 def _Decompress(data):
124   """Unpacks data compressed by the RPC client.
125
126   @type data: list or tuple
127   @param data: Data sent by RPC client
128   @rtype: str
129   @return: Decompressed data
130
131   """
132   assert isinstance(data, (list, tuple))
133   assert len(data) == 2
134   (encoding, content) = data
135   if encoding == constants.RPC_ENCODING_NONE:
136     return content
137   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138     return zlib.decompress(base64.b64decode(content))
139   else:
140     raise AssertionError("Unknown data encoding")
141
142
143 def _CleanDirectory(path, exclude=None):
144   """Removes all regular files in a directory.
145
146   @type path: str
147   @param path: the directory to clean
148   @type exclude: list
149   @param exclude: list of files to be excluded, defaults
150       to the empty list
151
152   """
153   if path not in _ALLOWED_CLEAN_DIRS:
154     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
155           path)
156
157   if not os.path.isdir(path):
158     return
159   if exclude is None:
160     exclude = []
161   else:
162     # Normalize excluded paths
163     exclude = [os.path.normpath(i) for i in exclude]
164
165   for rel_name in utils.ListVisibleFiles(path):
166     full_name = utils.PathJoin(path, rel_name)
167     if full_name in exclude:
168       continue
169     if os.path.isfile(full_name) and not os.path.islink(full_name):
170       utils.RemoveFile(full_name)
171
172
173 def _BuildUploadFileList():
174   """Build the list of allowed upload files.
175
176   This is abstracted so that it's built only once at module import time.
177
178   """
179   allowed_files = set([
180     constants.CLUSTER_CONF_FILE,
181     constants.ETC_HOSTS,
182     constants.SSH_KNOWN_HOSTS_FILE,
183     constants.VNC_PASSWORD_FILE,
184     constants.RAPI_CERT_FILE,
185     constants.RAPI_USERS_FILE,
186     constants.CONFD_HMAC_KEY,
187     ])
188
189   for hv_name in constants.HYPER_TYPES:
190     hv_class = hypervisor.GetHypervisorClass(hv_name)
191     allowed_files.update(hv_class.GetAncillaryFiles())
192
193   return frozenset(allowed_files)
194
195
196 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
197
198
199 def JobQueuePurge():
200   """Removes job queue files and archived jobs.
201
202   @rtype: tuple
203   @return: True, None
204
205   """
206   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
208
209
210 def GetMasterInfo():
211   """Returns master information.
212
213   This is an utility function to compute master information, either
214   for consumption here or from the node daemon.
215
216   @rtype: tuple
217   @return: master_netdev, master_ip, master_name
218   @raise RPCFail: in case of errors
219
220   """
221   try:
222     cfg = _GetConfig()
223     master_netdev = cfg.GetMasterNetdev()
224     master_ip = cfg.GetMasterIP()
225     master_node = cfg.GetMasterNode()
226   except errors.ConfigurationError, err:
227     _Fail("Cluster configuration incomplete: %s", err, exc=True)
228   return (master_netdev, master_ip, master_node)
229
230
231 def StartMaster(start_daemons, no_voting):
232   """Activate local node as master node.
233
234   The function will always try activate the IP address of the master
235   (unless someone else has it). It will also start the master daemons,
236   based on the start_daemons parameter.
237
238   @type start_daemons: boolean
239   @param start_daemons: whether to also start the master
240       daemons (ganeti-masterd and ganeti-rapi)
241   @type no_voting: boolean
242   @param no_voting: whether to start ganeti-masterd without a node vote
243       (if start_daemons is True), but still non-interactively
244   @rtype: None
245
246   """
247   # GetMasterInfo will raise an exception if not able to return data
248   master_netdev, master_ip, _ = GetMasterInfo()
249
250   err_msgs = []
251   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252     if utils.OwnIpAddress(master_ip):
253       # we already have the ip:
254       logging.debug("Master IP already configured, doing nothing")
255     else:
256       msg = "Someone else has the master ip, not activating"
257       logging.error(msg)
258       err_msgs.append(msg)
259   else:
260     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261                            "dev", master_netdev, "label",
262                            "%s:0" % master_netdev])
263     if result.failed:
264       msg = "Can't activate master IP: %s" % result.output
265       logging.error(msg)
266       err_msgs.append(msg)
267
268     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269                            "-s", master_ip, master_ip])
270     # we'll ignore the exit code of arping
271
272   # and now start the master and rapi daemons
273   if start_daemons:
274     if no_voting:
275       masterd_args = "--no-voting --yes-do-it"
276     else:
277       masterd_args = ""
278
279     env = {
280       "EXTRA_MASTERD_ARGS": masterd_args,
281       }
282
283     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
284     if result.failed:
285       msg = "Can't start Ganeti master: %s" % result.output
286       logging.error(msg)
287       err_msgs.append(msg)
288
289   if err_msgs:
290     _Fail("; ".join(err_msgs))
291
292
293 def StopMaster(stop_daemons):
294   """Deactivate this node as master.
295
296   The function will always try to deactivate the IP address of the
297   master. It will also stop the master daemons depending on the
298   stop_daemons parameter.
299
300   @type stop_daemons: boolean
301   @param stop_daemons: whether to also stop the master daemons
302       (ganeti-masterd and ganeti-rapi)
303   @rtype: None
304
305   """
306   # TODO: log and report back to the caller the error failures; we
307   # need to decide in which case we fail the RPC for this
308
309   # GetMasterInfo will raise an exception if not able to return data
310   master_netdev, master_ip, _ = GetMasterInfo()
311
312   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313                          "dev", master_netdev])
314   if result.failed:
315     logging.error("Can't remove the master IP, error: %s", result.output)
316     # but otherwise ignore the failure
317
318   if stop_daemons:
319     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
320     if result.failed:
321       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
322                     " and error %s",
323                     result.cmd, result.exit_code, result.output)
324
325
326 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327   """Joins this node to the cluster.
328
329   This does the following:
330       - updates the hostkeys of the machine (rsa and dsa)
331       - adds the ssh private key to the user
332       - adds the ssh public key to the users' authorized_keys file
333
334   @type dsa: str
335   @param dsa: the DSA private key to write
336   @type dsapub: str
337   @param dsapub: the DSA public key to write
338   @type rsa: str
339   @param rsa: the RSA private key to write
340   @type rsapub: str
341   @param rsapub: the RSA public key to write
342   @type sshkey: str
343   @param sshkey: the SSH private key to write
344   @type sshpub: str
345   @param sshpub: the SSH public key to write
346   @rtype: boolean
347   @return: the success of the operation
348
349   """
350   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354   for name, content, mode in sshd_keys:
355     utils.WriteFile(name, data=content, mode=mode)
356
357   try:
358     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
359                                                     mkdir=True)
360   except errors.OpExecError, err:
361     _Fail("Error while processing user ssh files: %s", err, exc=True)
362
363   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364     utils.WriteFile(name, data=content, mode=0600)
365
366   utils.AddAuthorizedKey(auth_keys, sshpub)
367
368   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
369   if result.failed:
370     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371           result.cmd, result.exit_code, result.output)
372
373
374 def LeaveCluster(modify_ssh_setup):
375   """Cleans up and remove the current node.
376
377   This function cleans up and prepares the current node to be removed
378   from the cluster.
379
380   If processing is successful, then it raises an
381   L{errors.QuitGanetiException} which is used as a special case to
382   shutdown the node daemon.
383
384   @param modify_ssh_setup: boolean
385
386   """
387   _CleanDirectory(constants.DATA_DIR)
388   JobQueuePurge()
389
390   if modify_ssh_setup:
391     try:
392       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
393
394       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
395
396       utils.RemoveFile(priv_key)
397       utils.RemoveFile(pub_key)
398     except errors.OpExecError:
399       logging.exception("Error while processing ssh files")
400
401   try:
402     utils.RemoveFile(constants.CONFD_HMAC_KEY)
403     utils.RemoveFile(constants.RAPI_CERT_FILE)
404     utils.RemoveFile(constants.NODED_CERT_FILE)
405   except: # pylint: disable-msg=W0702
406     logging.exception("Error while removing cluster secrets")
407
408   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
409   if result.failed:
410     logging.error("Command %s failed with exitcode %s and error %s",
411                   result.cmd, result.exit_code, result.output)
412
413   # Raise a custom exception (handled in ganeti-noded)
414   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
415
416
417 def GetNodeInfo(vgname, hypervisor_type):
418   """Gives back a hash with different information about the node.
419
420   @type vgname: C{string}
421   @param vgname: the name of the volume group to ask for disk space information
422   @type hypervisor_type: C{str}
423   @param hypervisor_type: the name of the hypervisor to ask for
424       memory information
425   @rtype: C{dict}
426   @return: dictionary with the following keys:
427       - vg_size is the size of the configured volume group in MiB
428       - vg_free is the free size of the volume group in MiB
429       - memory_dom0 is the memory allocated for domain0 in MiB
430       - memory_free is the currently available (free) ram in MiB
431       - memory_total is the total number of ram in MiB
432
433   """
434   outputarray = {}
435   vginfo = _GetVGInfo(vgname)
436   outputarray['vg_size'] = vginfo['vg_size']
437   outputarray['vg_free'] = vginfo['vg_free']
438
439   hyper = hypervisor.GetHypervisor(hypervisor_type)
440   hyp_info = hyper.GetNodeInfo()
441   if hyp_info is not None:
442     outputarray.update(hyp_info)
443
444   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
445
446   return outputarray
447
448
449 def VerifyNode(what, cluster_name):
450   """Verify the status of the local node.
451
452   Based on the input L{what} parameter, various checks are done on the
453   local node.
454
455   If the I{filelist} key is present, this list of
456   files is checksummed and the file/checksum pairs are returned.
457
458   If the I{nodelist} key is present, we check that we have
459   connectivity via ssh with the target nodes (and check the hostname
460   report).
461
462   If the I{node-net-test} key is present, we check that we have
463   connectivity to the given nodes via both primary IP and, if
464   applicable, secondary IPs.
465
466   @type what: C{dict}
467   @param what: a dictionary of things to check:
468       - filelist: list of files for which to compute checksums
469       - nodelist: list of nodes we should check ssh communication with
470       - node-net-test: list of nodes we should check node daemon port
471         connectivity with
472       - hypervisor: list with hypervisors to run the verify for
473   @rtype: dict
474   @return: a dictionary with the same keys as the input dict, and
475       values representing the result of the checks
476
477   """
478   result = {}
479
480   if constants.NV_HYPERVISOR in what:
481     result[constants.NV_HYPERVISOR] = tmp = {}
482     for hv_name in what[constants.NV_HYPERVISOR]:
483       try:
484         val = hypervisor.GetHypervisor(hv_name).Verify()
485       except errors.HypervisorError, err:
486         val = "Error while checking hypervisor: %s" % str(err)
487       tmp[hv_name] = val
488
489   if constants.NV_FILELIST in what:
490     result[constants.NV_FILELIST] = utils.FingerprintFiles(
491       what[constants.NV_FILELIST])
492
493   if constants.NV_NODELIST in what:
494     result[constants.NV_NODELIST] = tmp = {}
495     random.shuffle(what[constants.NV_NODELIST])
496     for node in what[constants.NV_NODELIST]:
497       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
498       if not success:
499         tmp[node] = message
500
501   if constants.NV_NODENETTEST in what:
502     result[constants.NV_NODENETTEST] = tmp = {}
503     my_name = utils.HostInfo().name
504     my_pip = my_sip = None
505     for name, pip, sip in what[constants.NV_NODENETTEST]:
506       if name == my_name:
507         my_pip = pip
508         my_sip = sip
509         break
510     if not my_pip:
511       tmp[my_name] = ("Can't find my own primary/secondary IP"
512                       " in the node list")
513     else:
514       port = utils.GetDaemonPort(constants.NODED)
515       for name, pip, sip in what[constants.NV_NODENETTEST]:
516         fail = []
517         if not utils.TcpPing(pip, port, source=my_pip):
518           fail.append("primary")
519         if sip != pip:
520           if not utils.TcpPing(sip, port, source=my_sip):
521             fail.append("secondary")
522         if fail:
523           tmp[name] = ("failure using the %s interface(s)" %
524                        " and ".join(fail))
525
526   if constants.NV_LVLIST in what:
527     try:
528       val = GetVolumeList(what[constants.NV_LVLIST])
529     except RPCFail, err:
530       val = str(err)
531     result[constants.NV_LVLIST] = val
532
533   if constants.NV_INSTANCELIST in what:
534     # GetInstanceList can fail
535     try:
536       val = GetInstanceList(what[constants.NV_INSTANCELIST])
537     except RPCFail, err:
538       val = str(err)
539     result[constants.NV_INSTANCELIST] = val
540
541   if constants.NV_VGLIST in what:
542     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
543
544   if constants.NV_PVLIST in what:
545     result[constants.NV_PVLIST] = \
546       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
547                                    filter_allocatable=False)
548
549   if constants.NV_VERSION in what:
550     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
551                                     constants.RELEASE_VERSION)
552
553   if constants.NV_HVINFO in what:
554     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
555     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
556
557   if constants.NV_DRBDLIST in what:
558     try:
559       used_minors = bdev.DRBD8.GetUsedDevs().keys()
560     except errors.BlockDeviceError, err:
561       logging.warning("Can't get used minors list", exc_info=True)
562       used_minors = str(err)
563     result[constants.NV_DRBDLIST] = used_minors
564
565   if constants.NV_NODESETUP in what:
566     result[constants.NV_NODESETUP] = tmpr = []
567     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
568       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
569                   " under /sys, missing required directories /sys/block"
570                   " and /sys/class/net")
571     if (not os.path.isdir("/proc/sys") or
572         not os.path.isfile("/proc/sysrq-trigger")):
573       tmpr.append("The procfs filesystem doesn't seem to be mounted"
574                   " under /proc, missing required directory /proc/sys and"
575                   " the file /proc/sysrq-trigger")
576
577   if constants.NV_TIME in what:
578     result[constants.NV_TIME] = utils.SplitTime(time.time())
579
580   return result
581
582
583 def GetVolumeList(vg_name):
584   """Compute list of logical volumes and their size.
585
586   @type vg_name: str
587   @param vg_name: the volume group whose LVs we should list
588   @rtype: dict
589   @return:
590       dictionary of all partions (key) with value being a tuple of
591       their size (in MiB), inactive and online status::
592
593         {'test1': ('20.06', True, True)}
594
595       in case of errors, a string is returned with the error
596       details.
597
598   """
599   lvs = {}
600   sep = '|'
601   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
602                          "--separator=%s" % sep,
603                          "-olv_name,lv_size,lv_attr", vg_name])
604   if result.failed:
605     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
606
607   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
608   for line in result.stdout.splitlines():
609     line = line.strip()
610     match = valid_line_re.match(line)
611     if not match:
612       logging.error("Invalid line returned from lvs output: '%s'", line)
613       continue
614     name, size, attr = match.groups()
615     inactive = attr[4] == '-'
616     online = attr[5] == 'o'
617     virtual = attr[0] == 'v'
618     if virtual:
619       # we don't want to report such volumes as existing, since they
620       # don't really hold data
621       continue
622     lvs[name] = (size, inactive, online)
623
624   return lvs
625
626
627 def ListVolumeGroups():
628   """List the volume groups and their size.
629
630   @rtype: dict
631   @return: dictionary with keys volume name and values the
632       size of the volume
633
634   """
635   return utils.ListVolumeGroups()
636
637
638 def NodeVolumes():
639   """List all volumes on this node.
640
641   @rtype: list
642   @return:
643     A list of dictionaries, each having four keys:
644       - name: the logical volume name,
645       - size: the size of the logical volume
646       - dev: the physical device on which the LV lives
647       - vg: the volume group to which it belongs
648
649     In case of errors, we return an empty list and log the
650     error.
651
652     Note that since a logical volume can live on multiple physical
653     volumes, the resulting list might include a logical volume
654     multiple times.
655
656   """
657   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
658                          "--separator=|",
659                          "--options=lv_name,lv_size,devices,vg_name"])
660   if result.failed:
661     _Fail("Failed to list logical volumes, lvs output: %s",
662           result.output)
663
664   def parse_dev(dev):
665     return dev.split('(')[0]
666
667   def handle_dev(dev):
668     return [parse_dev(x) for x in dev.split(",")]
669
670   def map_line(line):
671     line = [v.strip() for v in line]
672     return [{'name': line[0], 'size': line[1],
673              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
674
675   all_devs = []
676   for line in result.stdout.splitlines():
677     if line.count('|') >= 3:
678       all_devs.extend(map_line(line.split('|')))
679     else:
680       logging.warning("Strange line in the output from lvs: '%s'", line)
681   return all_devs
682
683
684 def BridgesExist(bridges_list):
685   """Check if a list of bridges exist on the current node.
686
687   @rtype: boolean
688   @return: C{True} if all of them exist, C{False} otherwise
689
690   """
691   missing = []
692   for bridge in bridges_list:
693     if not utils.BridgeExists(bridge):
694       missing.append(bridge)
695
696   if missing:
697     _Fail("Missing bridges %s", utils.CommaJoin(missing))
698
699
700 def GetInstanceList(hypervisor_list):
701   """Provides a list of instances.
702
703   @type hypervisor_list: list
704   @param hypervisor_list: the list of hypervisors to query information
705
706   @rtype: list
707   @return: a list of all running instances on the current node
708     - instance1.example.com
709     - instance2.example.com
710
711   """
712   results = []
713   for hname in hypervisor_list:
714     try:
715       names = hypervisor.GetHypervisor(hname).ListInstances()
716       results.extend(names)
717     except errors.HypervisorError, err:
718       _Fail("Error enumerating instances (hypervisor %s): %s",
719             hname, err, exc=True)
720
721   return results
722
723
724 def GetInstanceInfo(instance, hname):
725   """Gives back the information about an instance as a dictionary.
726
727   @type instance: string
728   @param instance: the instance name
729   @type hname: string
730   @param hname: the hypervisor type of the instance
731
732   @rtype: dict
733   @return: dictionary with the following keys:
734       - memory: memory size of instance (int)
735       - state: xen state of instance (string)
736       - time: cpu time of instance (float)
737
738   """
739   output = {}
740
741   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
742   if iinfo is not None:
743     output['memory'] = iinfo[2]
744     output['state'] = iinfo[4]
745     output['time'] = iinfo[5]
746
747   return output
748
749
750 def GetInstanceMigratable(instance):
751   """Gives whether an instance can be migrated.
752
753   @type instance: L{objects.Instance}
754   @param instance: object representing the instance to be checked.
755
756   @rtype: tuple
757   @return: tuple of (result, description) where:
758       - result: whether the instance can be migrated or not
759       - description: a description of the issue, if relevant
760
761   """
762   hyper = hypervisor.GetHypervisor(instance.hypervisor)
763   iname = instance.name
764   if iname not in hyper.ListInstances():
765     _Fail("Instance %s is not running", iname)
766
767   for idx in range(len(instance.disks)):
768     link_name = _GetBlockDevSymlinkPath(iname, idx)
769     if not os.path.islink(link_name):
770       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
771
772
773 def GetAllInstancesInfo(hypervisor_list):
774   """Gather data about all instances.
775
776   This is the equivalent of L{GetInstanceInfo}, except that it
777   computes data for all instances at once, thus being faster if one
778   needs data about more than one instance.
779
780   @type hypervisor_list: list
781   @param hypervisor_list: list of hypervisors to query for instance data
782
783   @rtype: dict
784   @return: dictionary of instance: data, with data having the following keys:
785       - memory: memory size of instance (int)
786       - state: xen state of instance (string)
787       - time: cpu time of instance (float)
788       - vcpus: the number of vcpus
789
790   """
791   output = {}
792
793   for hname in hypervisor_list:
794     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
795     if iinfo:
796       for name, _, memory, vcpus, state, times in iinfo:
797         value = {
798           'memory': memory,
799           'vcpus': vcpus,
800           'state': state,
801           'time': times,
802           }
803         if name in output:
804           # we only check static parameters, like memory and vcpus,
805           # and not state and time which can change between the
806           # invocations of the different hypervisors
807           for key in 'memory', 'vcpus':
808             if value[key] != output[name][key]:
809               _Fail("Instance %s is running twice"
810                     " with different parameters", name)
811         output[name] = value
812
813   return output
814
815
816 def _InstanceLogName(kind, os_name, instance):
817   """Compute the OS log filename for a given instance and operation.
818
819   The instance name and os name are passed in as strings since not all
820   operations have these as part of an instance object.
821
822   @type kind: string
823   @param kind: the operation type (e.g. add, import, etc.)
824   @type os_name: string
825   @param os_name: the os name
826   @type instance: string
827   @param instance: the name of the instance being imported/added/etc.
828
829   """
830   base = ("%s-%s-%s-%s.log" %
831           (kind, os_name, instance, utils.TimestampForFilename()))
832   return utils.PathJoin(constants.LOG_OS_DIR, base)
833
834
835 def InstanceOsAdd(instance, reinstall, debug):
836   """Add an OS to an instance.
837
838   @type instance: L{objects.Instance}
839   @param instance: Instance whose OS is to be installed
840   @type reinstall: boolean
841   @param reinstall: whether this is an instance reinstall
842   @type debug: integer
843   @param debug: debug level, passed to the OS scripts
844   @rtype: None
845
846   """
847   inst_os = OSFromDisk(instance.os)
848
849   create_env = OSEnvironment(instance, inst_os, debug)
850   if reinstall:
851     create_env['INSTANCE_REINSTALL'] = "1"
852
853   logfile = _InstanceLogName("add", instance.os, instance.name)
854
855   result = utils.RunCmd([inst_os.create_script], env=create_env,
856                         cwd=inst_os.path, output=logfile,)
857   if result.failed:
858     logging.error("os create command '%s' returned error: %s, logfile: %s,"
859                   " output: %s", result.cmd, result.fail_reason, logfile,
860                   result.output)
861     lines = [utils.SafeEncode(val)
862              for val in utils.TailFile(logfile, lines=20)]
863     _Fail("OS create script failed (%s), last lines in the"
864           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
865
866
867 def RunRenameInstance(instance, old_name, debug):
868   """Run the OS rename script for an instance.
869
870   @type instance: L{objects.Instance}
871   @param instance: Instance whose OS is to be installed
872   @type old_name: string
873   @param old_name: previous instance name
874   @type debug: integer
875   @param debug: debug level, passed to the OS scripts
876   @rtype: boolean
877   @return: the success of the operation
878
879   """
880   inst_os = OSFromDisk(instance.os)
881
882   rename_env = OSEnvironment(instance, inst_os, debug)
883   rename_env['OLD_INSTANCE_NAME'] = old_name
884
885   logfile = _InstanceLogName("rename", instance.os,
886                              "%s-%s" % (old_name, instance.name))
887
888   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
889                         cwd=inst_os.path, output=logfile)
890
891   if result.failed:
892     logging.error("os create command '%s' returned error: %s output: %s",
893                   result.cmd, result.fail_reason, result.output)
894     lines = [utils.SafeEncode(val)
895              for val in utils.TailFile(logfile, lines=20)]
896     _Fail("OS rename script failed (%s), last lines in the"
897           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
898
899
900 def _GetVGInfo(vg_name):
901   """Get information about the volume group.
902
903   @type vg_name: str
904   @param vg_name: the volume group which we query
905   @rtype: dict
906   @return:
907     A dictionary with the following keys:
908       - C{vg_size} is the total size of the volume group in MiB
909       - C{vg_free} is the free size of the volume group in MiB
910       - C{pv_count} are the number of physical disks in that VG
911
912     If an error occurs during gathering of data, we return the same dict
913     with keys all set to None.
914
915   """
916   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
917
918   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
919                          "--nosuffix", "--units=m", "--separator=:", vg_name])
920
921   if retval.failed:
922     logging.error("volume group %s not present", vg_name)
923     return retdic
924   valarr = retval.stdout.strip().rstrip(':').split(':')
925   if len(valarr) == 3:
926     try:
927       retdic = {
928         "vg_size": int(round(float(valarr[0]), 0)),
929         "vg_free": int(round(float(valarr[1]), 0)),
930         "pv_count": int(valarr[2]),
931         }
932     except (TypeError, ValueError), err:
933       logging.exception("Fail to parse vgs output: %s", err)
934   else:
935     logging.error("vgs output has the wrong number of fields (expected"
936                   " three): %s", str(valarr))
937   return retdic
938
939
940 def _GetBlockDevSymlinkPath(instance_name, idx):
941   return utils.PathJoin(constants.DISK_LINKS_DIR,
942                         "%s:%d" % (instance_name, idx))
943
944
945 def _SymlinkBlockDev(instance_name, device_path, idx):
946   """Set up symlinks to a instance's block device.
947
948   This is an auxiliary function run when an instance is start (on the primary
949   node) or when an instance is migrated (on the target node).
950
951
952   @param instance_name: the name of the target instance
953   @param device_path: path of the physical block device, on the node
954   @param idx: the disk index
955   @return: absolute path to the disk's symlink
956
957   """
958   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959   try:
960     os.symlink(device_path, link_name)
961   except OSError, err:
962     if err.errno == errno.EEXIST:
963       if (not os.path.islink(link_name) or
964           os.readlink(link_name) != device_path):
965         os.remove(link_name)
966         os.symlink(device_path, link_name)
967     else:
968       raise
969
970   return link_name
971
972
973 def _RemoveBlockDevLinks(instance_name, disks):
974   """Remove the block device symlinks belonging to the given instance.
975
976   """
977   for idx, _ in enumerate(disks):
978     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
979     if os.path.islink(link_name):
980       try:
981         os.remove(link_name)
982       except OSError:
983         logging.exception("Can't remove symlink '%s'", link_name)
984
985
986 def _GatherAndLinkBlockDevs(instance):
987   """Set up an instance's block device(s).
988
989   This is run on the primary node at instance startup. The block
990   devices must be already assembled.
991
992   @type instance: L{objects.Instance}
993   @param instance: the instance whose disks we shoul assemble
994   @rtype: list
995   @return: list of (disk_object, device_path)
996
997   """
998   block_devices = []
999   for idx, disk in enumerate(instance.disks):
1000     device = _RecursiveFindBD(disk)
1001     if device is None:
1002       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1003                                     str(disk))
1004     device.Open()
1005     try:
1006       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007     except OSError, e:
1008       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1009                                     e.strerror)
1010
1011     block_devices.append((disk, link_name))
1012
1013   return block_devices
1014
1015
1016 def StartInstance(instance):
1017   """Start an instance.
1018
1019   @type instance: L{objects.Instance}
1020   @param instance: the instance object
1021   @rtype: None
1022
1023   """
1024   running_instances = GetInstanceList([instance.hypervisor])
1025
1026   if instance.name in running_instances:
1027     logging.info("Instance %s already running, not starting", instance.name)
1028     return
1029
1030   try:
1031     block_devices = _GatherAndLinkBlockDevs(instance)
1032     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1033     hyper.StartInstance(instance, block_devices)
1034   except errors.BlockDeviceError, err:
1035     _Fail("Block device error: %s", err, exc=True)
1036   except errors.HypervisorError, err:
1037     _RemoveBlockDevLinks(instance.name, instance.disks)
1038     _Fail("Hypervisor error: %s", err, exc=True)
1039
1040
1041 def InstanceShutdown(instance, timeout):
1042   """Shut an instance down.
1043
1044   @note: this functions uses polling with a hardcoded timeout.
1045
1046   @type instance: L{objects.Instance}
1047   @param instance: the instance object
1048   @type timeout: integer
1049   @param timeout: maximum timeout for soft shutdown
1050   @rtype: None
1051
1052   """
1053   hv_name = instance.hypervisor
1054   hyper = hypervisor.GetHypervisor(hv_name)
1055   iname = instance.name
1056
1057   if instance.name not in hyper.ListInstances():
1058     logging.info("Instance %s not running, doing nothing", iname)
1059     return
1060
1061   class _TryShutdown:
1062     def __init__(self):
1063       self.tried_once = False
1064
1065     def __call__(self):
1066       if iname not in hyper.ListInstances():
1067         return
1068
1069       try:
1070         hyper.StopInstance(instance, retry=self.tried_once)
1071       except errors.HypervisorError, err:
1072         if iname not in hyper.ListInstances():
1073           # if the instance is no longer existing, consider this a
1074           # success and go to cleanup
1075           return
1076
1077         _Fail("Failed to stop instance %s: %s", iname, err)
1078
1079       self.tried_once = True
1080
1081       raise utils.RetryAgain()
1082
1083   try:
1084     utils.Retry(_TryShutdown(), 5, timeout)
1085   except utils.RetryTimeout:
1086     # the shutdown did not succeed
1087     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1088
1089     try:
1090       hyper.StopInstance(instance, force=True)
1091     except errors.HypervisorError, err:
1092       if iname in hyper.ListInstances():
1093         # only raise an error if the instance still exists, otherwise
1094         # the error could simply be "instance ... unknown"!
1095         _Fail("Failed to force stop instance %s: %s", iname, err)
1096
1097     time.sleep(1)
1098
1099     if iname in hyper.ListInstances():
1100       _Fail("Could not shutdown instance %s even by destroy", iname)
1101
1102   try:
1103     hyper.CleanupInstance(instance.name)
1104   except errors.HypervisorError, err:
1105     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106
1107   _RemoveBlockDevLinks(iname, instance.disks)
1108
1109
1110 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1111   """Reboot an instance.
1112
1113   @type instance: L{objects.Instance}
1114   @param instance: the instance object to reboot
1115   @type reboot_type: str
1116   @param reboot_type: the type of reboot, one the following
1117     constants:
1118       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1119         instance OS, do not recreate the VM
1120       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1121         restart the VM (at the hypervisor level)
1122       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1123         not accepted here, since that mode is handled differently, in
1124         cmdlib, and translates into full stop and start of the
1125         instance (instead of a call_instance_reboot RPC)
1126   @type shutdown_timeout: integer
1127   @param shutdown_timeout: maximum timeout for soft shutdown
1128   @rtype: None
1129
1130   """
1131   running_instances = GetInstanceList([instance.hypervisor])
1132
1133   if instance.name not in running_instances:
1134     _Fail("Cannot reboot instance %s that is not running", instance.name)
1135
1136   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1137   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138     try:
1139       hyper.RebootInstance(instance)
1140     except errors.HypervisorError, err:
1141       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1142   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143     try:
1144       InstanceShutdown(instance, shutdown_timeout)
1145       return StartInstance(instance)
1146     except errors.HypervisorError, err:
1147       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148   else:
1149     _Fail("Invalid reboot_type received: %s", reboot_type)
1150
1151
1152 def MigrationInfo(instance):
1153   """Gather information about an instance to be migrated.
1154
1155   @type instance: L{objects.Instance}
1156   @param instance: the instance definition
1157
1158   """
1159   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160   try:
1161     info = hyper.MigrationInfo(instance)
1162   except errors.HypervisorError, err:
1163     _Fail("Failed to fetch migration information: %s", err, exc=True)
1164   return info
1165
1166
1167 def AcceptInstance(instance, info, target):
1168   """Prepare the node to accept an instance.
1169
1170   @type instance: L{objects.Instance}
1171   @param instance: the instance definition
1172   @type info: string/data (opaque)
1173   @param info: migration information, from the source node
1174   @type target: string
1175   @param target: target host (usually ip), on this node
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     hyper.AcceptInstance(instance, info, target)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to accept instance: %s", err, exc=True)
1183
1184
1185 def FinalizeMigration(instance, info, success):
1186   """Finalize any preparation to accept an instance.
1187
1188   @type instance: L{objects.Instance}
1189   @param instance: the instance definition
1190   @type info: string/data (opaque)
1191   @param info: migration information, from the source node
1192   @type success: boolean
1193   @param success: whether the migration was a success or a failure
1194
1195   """
1196   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197   try:
1198     hyper.FinalizeMigration(instance, info, success)
1199   except errors.HypervisorError, err:
1200     _Fail("Failed to finalize migration: %s", err, exc=True)
1201
1202
1203 def MigrateInstance(instance, target, live):
1204   """Migrates an instance to another node.
1205
1206   @type instance: L{objects.Instance}
1207   @param instance: the instance definition
1208   @type target: string
1209   @param target: the target node name
1210   @type live: boolean
1211   @param live: whether the migration should be done live or not (the
1212       interpretation of this parameter is left to the hypervisor)
1213   @rtype: tuple
1214   @return: a tuple of (success, msg) where:
1215       - succes is a boolean denoting the success/failure of the operation
1216       - msg is a string with details in case of failure
1217
1218   """
1219   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220
1221   try:
1222     hyper.MigrateInstance(instance, target, live)
1223   except errors.HypervisorError, err:
1224     _Fail("Failed to migrate instance: %s", err, exc=True)
1225
1226
1227 def BlockdevCreate(disk, size, owner, on_primary, info):
1228   """Creates a block device for an instance.
1229
1230   @type disk: L{objects.Disk}
1231   @param disk: the object describing the disk we should create
1232   @type size: int
1233   @param size: the size of the physical underlying device, in MiB
1234   @type owner: str
1235   @param owner: the name of the instance for which disk is created,
1236       used for device cache data
1237   @type on_primary: boolean
1238   @param on_primary:  indicates if it is the primary node or not
1239   @type info: string
1240   @param info: string that will be sent to the physical device
1241       creation, used for example to set (LVM) tags on LVs
1242
1243   @return: the new unique_id of the device (this can sometime be
1244       computed only after creation), or None. On secondary nodes,
1245       it's not required to return anything.
1246
1247   """
1248   # TODO: remove the obsolete 'size' argument
1249   # pylint: disable-msg=W0613
1250   clist = []
1251   if disk.children:
1252     for child in disk.children:
1253       try:
1254         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1255       except errors.BlockDeviceError, err:
1256         _Fail("Can't assemble device %s: %s", child, err)
1257       if on_primary or disk.AssembleOnSecondary():
1258         # we need the children open in case the device itself has to
1259         # be assembled
1260         try:
1261           # pylint: disable-msg=E1103
1262           crdev.Open()
1263         except errors.BlockDeviceError, err:
1264           _Fail("Can't make child '%s' read-write: %s", child, err)
1265       clist.append(crdev)
1266
1267   try:
1268     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1269   except errors.BlockDeviceError, err:
1270     _Fail("Can't create block device: %s", err)
1271
1272   if on_primary or disk.AssembleOnSecondary():
1273     try:
1274       device.Assemble()
1275     except errors.BlockDeviceError, err:
1276       _Fail("Can't assemble device after creation, unusual event: %s", err)
1277     device.SetSyncSpeed(constants.SYNC_SPEED)
1278     if on_primary or disk.OpenOnSecondary():
1279       try:
1280         device.Open(force=True)
1281       except errors.BlockDeviceError, err:
1282         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1283     DevCacheManager.UpdateCache(device.dev_path, owner,
1284                                 on_primary, disk.iv_name)
1285
1286   device.SetInfo(info)
1287
1288   return device.unique_id
1289
1290
1291 def BlockdevRemove(disk):
1292   """Remove a block device.
1293
1294   @note: This is intended to be called recursively.
1295
1296   @type disk: L{objects.Disk}
1297   @param disk: the disk object we should remove
1298   @rtype: boolean
1299   @return: the success of the operation
1300
1301   """
1302   msgs = []
1303   try:
1304     rdev = _RecursiveFindBD(disk)
1305   except errors.BlockDeviceError, err:
1306     # probably can't attach
1307     logging.info("Can't attach to device %s in remove", disk)
1308     rdev = None
1309   if rdev is not None:
1310     r_path = rdev.dev_path
1311     try:
1312       rdev.Remove()
1313     except errors.BlockDeviceError, err:
1314       msgs.append(str(err))
1315     if not msgs:
1316       DevCacheManager.RemoveCache(r_path)
1317
1318   if disk.children:
1319     for child in disk.children:
1320       try:
1321         BlockdevRemove(child)
1322       except RPCFail, err:
1323         msgs.append(str(err))
1324
1325   if msgs:
1326     _Fail("; ".join(msgs))
1327
1328
1329 def _RecursiveAssembleBD(disk, owner, as_primary):
1330   """Activate a block device for an instance.
1331
1332   This is run on the primary and secondary nodes for an instance.
1333
1334   @note: this function is called recursively.
1335
1336   @type disk: L{objects.Disk}
1337   @param disk: the disk we try to assemble
1338   @type owner: str
1339   @param owner: the name of the instance which owns the disk
1340   @type as_primary: boolean
1341   @param as_primary: if we should make the block device
1342       read/write
1343
1344   @return: the assembled device or None (in case no device
1345       was assembled)
1346   @raise errors.BlockDeviceError: in case there is an error
1347       during the activation of the children or the device
1348       itself
1349
1350   """
1351   children = []
1352   if disk.children:
1353     mcn = disk.ChildrenNeeded()
1354     if mcn == -1:
1355       mcn = 0 # max number of Nones allowed
1356     else:
1357       mcn = len(disk.children) - mcn # max number of Nones
1358     for chld_disk in disk.children:
1359       try:
1360         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1361       except errors.BlockDeviceError, err:
1362         if children.count(None) >= mcn:
1363           raise
1364         cdev = None
1365         logging.error("Error in child activation (but continuing): %s",
1366                       str(err))
1367       children.append(cdev)
1368
1369   if as_primary or disk.AssembleOnSecondary():
1370     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1371     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1372     result = r_dev
1373     if as_primary or disk.OpenOnSecondary():
1374       r_dev.Open()
1375     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1376                                 as_primary, disk.iv_name)
1377
1378   else:
1379     result = True
1380   return result
1381
1382
1383 def BlockdevAssemble(disk, owner, as_primary):
1384   """Activate a block device for an instance.
1385
1386   This is a wrapper over _RecursiveAssembleBD.
1387
1388   @rtype: str or boolean
1389   @return: a C{/dev/...} path for primary nodes, and
1390       C{True} for secondary nodes
1391
1392   """
1393   try:
1394     result = _RecursiveAssembleBD(disk, owner, as_primary)
1395     if isinstance(result, bdev.BlockDev):
1396       # pylint: disable-msg=E1103
1397       result = result.dev_path
1398   except errors.BlockDeviceError, err:
1399     _Fail("Error while assembling disk: %s", err, exc=True)
1400
1401   return result
1402
1403
1404 def BlockdevShutdown(disk):
1405   """Shut down a block device.
1406
1407   First, if the device is assembled (Attach() is successful), then
1408   the device is shutdown. Then the children of the device are
1409   shutdown.
1410
1411   This function is called recursively. Note that we don't cache the
1412   children or such, as oppossed to assemble, shutdown of different
1413   devices doesn't require that the upper device was active.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the description of the disk we should
1417       shutdown
1418   @rtype: None
1419
1420   """
1421   msgs = []
1422   r_dev = _RecursiveFindBD(disk)
1423   if r_dev is not None:
1424     r_path = r_dev.dev_path
1425     try:
1426       r_dev.Shutdown()
1427       DevCacheManager.RemoveCache(r_path)
1428     except errors.BlockDeviceError, err:
1429       msgs.append(str(err))
1430
1431   if disk.children:
1432     for child in disk.children:
1433       try:
1434         BlockdevShutdown(child)
1435       except RPCFail, err:
1436         msgs.append(str(err))
1437
1438   if msgs:
1439     _Fail("; ".join(msgs))
1440
1441
1442 def BlockdevAddchildren(parent_cdev, new_cdevs):
1443   """Extend a mirrored block device.
1444
1445   @type parent_cdev: L{objects.Disk}
1446   @param parent_cdev: the disk to which we should add children
1447   @type new_cdevs: list of L{objects.Disk}
1448   @param new_cdevs: the list of children which we should add
1449   @rtype: None
1450
1451   """
1452   parent_bdev = _RecursiveFindBD(parent_cdev)
1453   if parent_bdev is None:
1454     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1455   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1456   if new_bdevs.count(None) > 0:
1457     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1458   parent_bdev.AddChildren(new_bdevs)
1459
1460
1461 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1462   """Shrink a mirrored block device.
1463
1464   @type parent_cdev: L{objects.Disk}
1465   @param parent_cdev: the disk from which we should remove children
1466   @type new_cdevs: list of L{objects.Disk}
1467   @param new_cdevs: the list of children which we should remove
1468   @rtype: None
1469
1470   """
1471   parent_bdev = _RecursiveFindBD(parent_cdev)
1472   if parent_bdev is None:
1473     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1474   devs = []
1475   for disk in new_cdevs:
1476     rpath = disk.StaticDevPath()
1477     if rpath is None:
1478       bd = _RecursiveFindBD(disk)
1479       if bd is None:
1480         _Fail("Can't find device %s while removing children", disk)
1481       else:
1482         devs.append(bd.dev_path)
1483     else:
1484       if not utils.IsNormAbsPath(rpath):
1485         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1486       devs.append(rpath)
1487   parent_bdev.RemoveChildren(devs)
1488
1489
1490 def BlockdevGetmirrorstatus(disks):
1491   """Get the mirroring status of a list of devices.
1492
1493   @type disks: list of L{objects.Disk}
1494   @param disks: the list of disks which we should query
1495   @rtype: disk
1496   @return:
1497       a list of (mirror_done, estimated_time) tuples, which
1498       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1499   @raise errors.BlockDeviceError: if any of the disks cannot be
1500       found
1501
1502   """
1503   stats = []
1504   for dsk in disks:
1505     rbd = _RecursiveFindBD(dsk)
1506     if rbd is None:
1507       _Fail("Can't find device %s", dsk)
1508
1509     stats.append(rbd.CombinedSyncStatus())
1510
1511   return stats
1512
1513
1514 def _RecursiveFindBD(disk):
1515   """Check if a device is activated.
1516
1517   If so, return information about the real device.
1518
1519   @type disk: L{objects.Disk}
1520   @param disk: the disk object we need to find
1521
1522   @return: None if the device can't be found,
1523       otherwise the device instance
1524
1525   """
1526   children = []
1527   if disk.children:
1528     for chdisk in disk.children:
1529       children.append(_RecursiveFindBD(chdisk))
1530
1531   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1532
1533
1534 def _OpenRealBD(disk):
1535   """Opens the underlying block device of a disk.
1536
1537   @type disk: L{objects.Disk}
1538   @param disk: the disk object we want to open
1539
1540   """
1541   real_disk = _RecursiveFindBD(disk)
1542   if real_disk is None:
1543     _Fail("Block device '%s' is not set up", disk)
1544
1545   real_disk.Open()
1546
1547   return real_disk
1548
1549
1550 def BlockdevFind(disk):
1551   """Check if a device is activated.
1552
1553   If it is, return information about the real device.
1554
1555   @type disk: L{objects.Disk}
1556   @param disk: the disk to find
1557   @rtype: None or objects.BlockDevStatus
1558   @return: None if the disk cannot be found, otherwise a the current
1559            information
1560
1561   """
1562   try:
1563     rbd = _RecursiveFindBD(disk)
1564   except errors.BlockDeviceError, err:
1565     _Fail("Failed to find device: %s", err, exc=True)
1566
1567   if rbd is None:
1568     return None
1569
1570   return rbd.GetSyncStatus()
1571
1572
1573 def BlockdevGetsize(disks):
1574   """Computes the size of the given disks.
1575
1576   If a disk is not found, returns None instead.
1577
1578   @type disks: list of L{objects.Disk}
1579   @param disks: the list of disk to compute the size for
1580   @rtype: list
1581   @return: list with elements None if the disk cannot be found,
1582       otherwise the size
1583
1584   """
1585   result = []
1586   for cf in disks:
1587     try:
1588       rbd = _RecursiveFindBD(cf)
1589     except errors.BlockDeviceError:
1590       result.append(None)
1591       continue
1592     if rbd is None:
1593       result.append(None)
1594     else:
1595       result.append(rbd.GetActualSize())
1596   return result
1597
1598
1599 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1600   """Export a block device to a remote node.
1601
1602   @type disk: L{objects.Disk}
1603   @param disk: the description of the disk to export
1604   @type dest_node: str
1605   @param dest_node: the destination node to export to
1606   @type dest_path: str
1607   @param dest_path: the destination path on the target node
1608   @type cluster_name: str
1609   @param cluster_name: the cluster name, needed for SSH hostalias
1610   @rtype: None
1611
1612   """
1613   real_disk = _OpenRealBD(disk)
1614
1615   # the block size on the read dd is 1MiB to match our units
1616   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1617                                "dd if=%s bs=1048576 count=%s",
1618                                real_disk.dev_path, str(disk.size))
1619
1620   # we set here a smaller block size as, due to ssh buffering, more
1621   # than 64-128k will mostly ignored; we use nocreat to fail if the
1622   # device is not already there or we pass a wrong path; we use
1623   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1624   # to not buffer too much memory; this means that at best, we flush
1625   # every 64k, which will not be very fast
1626   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1627                                 " oflag=dsync", dest_path)
1628
1629   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1630                                                    constants.GANETI_RUNAS,
1631                                                    destcmd)
1632
1633   # all commands have been checked, so we're safe to combine them
1634   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1635
1636   result = utils.RunCmd(["bash", "-c", command])
1637
1638   if result.failed:
1639     _Fail("Disk copy command '%s' returned error: %s"
1640           " output: %s", command, result.fail_reason, result.output)
1641
1642
1643 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1644   """Write a file to the filesystem.
1645
1646   This allows the master to overwrite(!) a file. It will only perform
1647   the operation if the file belongs to a list of configuration files.
1648
1649   @type file_name: str
1650   @param file_name: the target file name
1651   @type data: str
1652   @param data: the new contents of the file
1653   @type mode: int
1654   @param mode: the mode to give the file (can be None)
1655   @type uid: int
1656   @param uid: the owner of the file (can be -1 for default)
1657   @type gid: int
1658   @param gid: the group of the file (can be -1 for default)
1659   @type atime: float
1660   @param atime: the atime to set on the file (can be None)
1661   @type mtime: float
1662   @param mtime: the mtime to set on the file (can be None)
1663   @rtype: None
1664
1665   """
1666   if not os.path.isabs(file_name):
1667     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1668
1669   if file_name not in _ALLOWED_UPLOAD_FILES:
1670     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1671           file_name)
1672
1673   raw_data = _Decompress(data)
1674
1675   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1676                   atime=atime, mtime=mtime)
1677
1678
1679 def WriteSsconfFiles(values):
1680   """Update all ssconf files.
1681
1682   Wrapper around the SimpleStore.WriteFiles.
1683
1684   """
1685   ssconf.SimpleStore().WriteFiles(values)
1686
1687
1688 def _ErrnoOrStr(err):
1689   """Format an EnvironmentError exception.
1690
1691   If the L{err} argument has an errno attribute, it will be looked up
1692   and converted into a textual C{E...} description. Otherwise the
1693   string representation of the error will be returned.
1694
1695   @type err: L{EnvironmentError}
1696   @param err: the exception to format
1697
1698   """
1699   if hasattr(err, 'errno'):
1700     detail = errno.errorcode[err.errno]
1701   else:
1702     detail = str(err)
1703   return detail
1704
1705
1706 def _OSOndiskAPIVersion(os_dir):
1707   """Compute and return the API version of a given OS.
1708
1709   This function will try to read the API version of the OS residing in
1710   the 'os_dir' directory.
1711
1712   @type os_dir: str
1713   @param os_dir: the directory in which we should look for the OS
1714   @rtype: tuple
1715   @return: tuple (status, data) with status denoting the validity and
1716       data holding either the vaid versions or an error message
1717
1718   """
1719   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1720
1721   try:
1722     st = os.stat(api_file)
1723   except EnvironmentError, err:
1724     return False, ("Required file '%s' not found under path %s: %s" %
1725                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1726
1727   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1728     return False, ("File '%s' in %s is not a regular file" %
1729                    (constants.OS_API_FILE, os_dir))
1730
1731   try:
1732     api_versions = utils.ReadFile(api_file).splitlines()
1733   except EnvironmentError, err:
1734     return False, ("Error while reading the API version file at %s: %s" %
1735                    (api_file, _ErrnoOrStr(err)))
1736
1737   try:
1738     api_versions = [int(version.strip()) for version in api_versions]
1739   except (TypeError, ValueError), err:
1740     return False, ("API version(s) can't be converted to integer: %s" %
1741                    str(err))
1742
1743   return True, api_versions
1744
1745
1746 def DiagnoseOS(top_dirs=None):
1747   """Compute the validity for all OSes.
1748
1749   @type top_dirs: list
1750   @param top_dirs: the list of directories in which to
1751       search (if not given defaults to
1752       L{constants.OS_SEARCH_PATH})
1753   @rtype: list of L{objects.OS}
1754   @return: a list of tuples (name, path, status, diagnose, variants)
1755       for all (potential) OSes under all search paths, where:
1756           - name is the (potential) OS name
1757           - path is the full path to the OS
1758           - status True/False is the validity of the OS
1759           - diagnose is the error message for an invalid OS, otherwise empty
1760           - variants is a list of supported OS variants, if any
1761
1762   """
1763   if top_dirs is None:
1764     top_dirs = constants.OS_SEARCH_PATH
1765
1766   result = []
1767   for dir_name in top_dirs:
1768     if os.path.isdir(dir_name):
1769       try:
1770         f_names = utils.ListVisibleFiles(dir_name)
1771       except EnvironmentError, err:
1772         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1773         break
1774       for name in f_names:
1775         os_path = utils.PathJoin(dir_name, name)
1776         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1777         if status:
1778           diagnose = ""
1779           variants = os_inst.supported_variants
1780         else:
1781           diagnose = os_inst
1782           variants = []
1783         result.append((name, os_path, status, diagnose, variants))
1784
1785   return result
1786
1787
1788 def _TryOSFromDisk(name, base_dir=None):
1789   """Create an OS instance from disk.
1790
1791   This function will return an OS instance if the given name is a
1792   valid OS name.
1793
1794   @type base_dir: string
1795   @keyword base_dir: Base directory containing OS installations.
1796                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1797   @rtype: tuple
1798   @return: success and either the OS instance if we find a valid one,
1799       or error message
1800
1801   """
1802   if base_dir is None:
1803     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1804   else:
1805     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1806
1807   if os_dir is None:
1808     return False, "Directory for OS %s not found in search path" % name
1809
1810   status, api_versions = _OSOndiskAPIVersion(os_dir)
1811   if not status:
1812     # push the error up
1813     return status, api_versions
1814
1815   if not constants.OS_API_VERSIONS.intersection(api_versions):
1816     return False, ("API version mismatch for path '%s': found %s, want %s." %
1817                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1818
1819   # OS Files dictionary, we will populate it with the absolute path names
1820   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1821
1822   if max(api_versions) >= constants.OS_API_V15:
1823     os_files[constants.OS_VARIANTS_FILE] = ''
1824
1825   for filename in os_files:
1826     os_files[filename] = utils.PathJoin(os_dir, filename)
1827
1828     try:
1829       st = os.stat(os_files[filename])
1830     except EnvironmentError, err:
1831       return False, ("File '%s' under path '%s' is missing (%s)" %
1832                      (filename, os_dir, _ErrnoOrStr(err)))
1833
1834     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1835       return False, ("File '%s' under path '%s' is not a regular file" %
1836                      (filename, os_dir))
1837
1838     if filename in constants.OS_SCRIPTS:
1839       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1840         return False, ("File '%s' under path '%s' is not executable" %
1841                        (filename, os_dir))
1842
1843   variants = None
1844   if constants.OS_VARIANTS_FILE in os_files:
1845     variants_file = os_files[constants.OS_VARIANTS_FILE]
1846     try:
1847       variants = utils.ReadFile(variants_file).splitlines()
1848     except EnvironmentError, err:
1849       return False, ("Error while reading the OS variants file at %s: %s" %
1850                      (variants_file, _ErrnoOrStr(err)))
1851     if not variants:
1852       return False, ("No supported os variant found")
1853
1854   os_obj = objects.OS(name=name, path=os_dir,
1855                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1856                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1857                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1858                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1859                       supported_variants=variants,
1860                       api_versions=api_versions)
1861   return True, os_obj
1862
1863
1864 def OSFromDisk(name, base_dir=None):
1865   """Create an OS instance from disk.
1866
1867   This function will return an OS instance if the given name is a
1868   valid OS name. Otherwise, it will raise an appropriate
1869   L{RPCFail} exception, detailing why this is not a valid OS.
1870
1871   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1872   an exception but returns true/false status data.
1873
1874   @type base_dir: string
1875   @keyword base_dir: Base directory containing OS installations.
1876                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1877   @rtype: L{objects.OS}
1878   @return: the OS instance if we find a valid one
1879   @raise RPCFail: if we don't find a valid OS
1880
1881   """
1882   name_only = name.split("+", 1)[0]
1883   status, payload = _TryOSFromDisk(name_only, base_dir)
1884
1885   if not status:
1886     _Fail(payload)
1887
1888   return payload
1889
1890
1891 def OSEnvironment(instance, inst_os, debug=0):
1892   """Calculate the environment for an os script.
1893
1894   @type instance: L{objects.Instance}
1895   @param instance: target instance for the os script run
1896   @type inst_os: L{objects.OS}
1897   @param inst_os: operating system for which the environment is being built
1898   @type debug: integer
1899   @param debug: debug level (0 or 1, for OS Api 10)
1900   @rtype: dict
1901   @return: dict of environment variables
1902   @raise errors.BlockDeviceError: if the block device
1903       cannot be found
1904
1905   """
1906   result = {}
1907   api_version = \
1908     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1909   result['OS_API_VERSION'] = '%d' % api_version
1910   result['INSTANCE_NAME'] = instance.name
1911   result['INSTANCE_OS'] = instance.os
1912   result['HYPERVISOR'] = instance.hypervisor
1913   result['DISK_COUNT'] = '%d' % len(instance.disks)
1914   result['NIC_COUNT'] = '%d' % len(instance.nics)
1915   result['DEBUG_LEVEL'] = '%d' % debug
1916   if api_version >= constants.OS_API_V15:
1917     try:
1918       variant = instance.os.split('+', 1)[1]
1919     except IndexError:
1920       variant = inst_os.supported_variants[0]
1921     result['OS_VARIANT'] = variant
1922   for idx, disk in enumerate(instance.disks):
1923     real_disk = _OpenRealBD(disk)
1924     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1925     result['DISK_%d_ACCESS' % idx] = disk.mode
1926     if constants.HV_DISK_TYPE in instance.hvparams:
1927       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1928         instance.hvparams[constants.HV_DISK_TYPE]
1929     if disk.dev_type in constants.LDS_BLOCK:
1930       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1931     elif disk.dev_type == constants.LD_FILE:
1932       result['DISK_%d_BACKEND_TYPE' % idx] = \
1933         'file:%s' % disk.physical_id[0]
1934   for idx, nic in enumerate(instance.nics):
1935     result['NIC_%d_MAC' % idx] = nic.mac
1936     if nic.ip:
1937       result['NIC_%d_IP' % idx] = nic.ip
1938     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1939     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1940       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1941     if nic.nicparams[constants.NIC_LINK]:
1942       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1943     if constants.HV_NIC_TYPE in instance.hvparams:
1944       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1945         instance.hvparams[constants.HV_NIC_TYPE]
1946
1947   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1948     for key, value in source.items():
1949       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1950
1951   return result
1952
1953
1954 def BlockdevGrow(disk, amount):
1955   """Grow a stack of block devices.
1956
1957   This function is called recursively, with the childrens being the
1958   first ones to resize.
1959
1960   @type disk: L{objects.Disk}
1961   @param disk: the disk to be grown
1962   @rtype: (status, result)
1963   @return: a tuple with the status of the operation
1964       (True/False), and the errors message if status
1965       is False
1966
1967   """
1968   r_dev = _RecursiveFindBD(disk)
1969   if r_dev is None:
1970     _Fail("Cannot find block device %s", disk)
1971
1972   try:
1973     r_dev.Grow(amount)
1974   except errors.BlockDeviceError, err:
1975     _Fail("Failed to grow block device: %s", err, exc=True)
1976
1977
1978 def BlockdevSnapshot(disk):
1979   """Create a snapshot copy of a block device.
1980
1981   This function is called recursively, and the snapshot is actually created
1982   just for the leaf lvm backend device.
1983
1984   @type disk: L{objects.Disk}
1985   @param disk: the disk to be snapshotted
1986   @rtype: string
1987   @return: snapshot disk path
1988
1989   """
1990   if disk.dev_type == constants.LD_DRBD8:
1991     if not disk.children:
1992       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1993             disk.unique_id)
1994     return BlockdevSnapshot(disk.children[0])
1995   elif disk.dev_type == constants.LD_LV:
1996     r_dev = _RecursiveFindBD(disk)
1997     if r_dev is not None:
1998       # FIXME: choose a saner value for the snapshot size
1999       # let's stay on the safe side and ask for the full size, for now
2000       return r_dev.Snapshot(disk.size)
2001     else:
2002       _Fail("Cannot find block device %s", disk)
2003   else:
2004     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2005           disk.unique_id, disk.dev_type)
2006
2007
2008 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
2009   """Export a block device snapshot to a remote node.
2010
2011   @type disk: L{objects.Disk}
2012   @param disk: the description of the disk to export
2013   @type dest_node: str
2014   @param dest_node: the destination node to export to
2015   @type instance: L{objects.Instance}
2016   @param instance: the instance object to whom the disk belongs
2017   @type cluster_name: str
2018   @param cluster_name: the cluster name, needed for SSH hostalias
2019   @type idx: int
2020   @param idx: the index of the disk in the instance's disk list,
2021       used to export to the OS scripts environment
2022   @type debug: integer
2023   @param debug: debug level, passed to the OS scripts
2024   @rtype: None
2025
2026   """
2027   inst_os = OSFromDisk(instance.os)
2028   export_env = OSEnvironment(instance, inst_os, debug)
2029
2030   export_script = inst_os.export_script
2031
2032   logfile = _InstanceLogName("export", inst_os.name, instance.name)
2033
2034   real_disk = _OpenRealBD(disk)
2035
2036   export_env['EXPORT_DEVICE'] = real_disk.dev_path
2037   export_env['EXPORT_INDEX'] = str(idx)
2038
2039   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2040   destfile = disk.physical_id[1]
2041
2042   # the target command is built out of three individual commands,
2043   # which are joined by pipes; we check each individual command for
2044   # valid parameters
2045   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2046                                inst_os.path, export_script, logfile)
2047
2048   comprcmd = "gzip"
2049
2050   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2051                                 destdir, utils.PathJoin(destdir, destfile))
2052   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2053                                                    constants.GANETI_RUNAS,
2054                                                    destcmd)
2055
2056   # all commands have been checked, so we're safe to combine them
2057   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2058
2059   result = utils.RunCmd(["bash", "-c", command], env=export_env)
2060
2061   if result.failed:
2062     _Fail("OS snapshot export command '%s' returned error: %s"
2063           " output: %s", command, result.fail_reason, result.output)
2064
2065
2066 def FinalizeExport(instance, snap_disks):
2067   """Write out the export configuration information.
2068
2069   @type instance: L{objects.Instance}
2070   @param instance: the instance which we export, used for
2071       saving configuration
2072   @type snap_disks: list of L{objects.Disk}
2073   @param snap_disks: list of snapshot block devices, which
2074       will be used to get the actual name of the dump file
2075
2076   @rtype: None
2077
2078   """
2079   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2080   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2081
2082   config = objects.SerializableConfigParser()
2083
2084   config.add_section(constants.INISECT_EXP)
2085   config.set(constants.INISECT_EXP, 'version', '0')
2086   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2087   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2088   config.set(constants.INISECT_EXP, 'os', instance.os)
2089   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2090
2091   config.add_section(constants.INISECT_INS)
2092   config.set(constants.INISECT_INS, 'name', instance.name)
2093   config.set(constants.INISECT_INS, 'memory', '%d' %
2094              instance.beparams[constants.BE_MEMORY])
2095   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2096              instance.beparams[constants.BE_VCPUS])
2097   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2098   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2099
2100   nic_total = 0
2101   for nic_count, nic in enumerate(instance.nics):
2102     nic_total += 1
2103     config.set(constants.INISECT_INS, 'nic%d_mac' %
2104                nic_count, '%s' % nic.mac)
2105     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2106     for param in constants.NICS_PARAMETER_TYPES:
2107       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2108                  '%s' % nic.nicparams.get(param, None))
2109   # TODO: redundant: on load can read nics until it doesn't exist
2110   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2111
2112   disk_total = 0
2113   for disk_count, disk in enumerate(snap_disks):
2114     if disk:
2115       disk_total += 1
2116       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2117                  ('%s' % disk.iv_name))
2118       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2119                  ('%s' % disk.physical_id[1]))
2120       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2121                  ('%d' % disk.size))
2122
2123   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2124
2125   # New-style hypervisor/backend parameters
2126
2127   config.add_section(constants.INISECT_HYP)
2128   for name, value in instance.hvparams.items():
2129     if name not in constants.HVC_GLOBALS:
2130       config.set(constants.INISECT_HYP, name, str(value))
2131
2132   config.add_section(constants.INISECT_BEP)
2133   for name, value in instance.beparams.items():
2134     config.set(constants.INISECT_BEP, name, str(value))
2135
2136   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2137                   data=config.Dumps())
2138   shutil.rmtree(finaldestdir, ignore_errors=True)
2139   shutil.move(destdir, finaldestdir)
2140
2141
2142 def ExportInfo(dest):
2143   """Get export configuration information.
2144
2145   @type dest: str
2146   @param dest: directory containing the export
2147
2148   @rtype: L{objects.SerializableConfigParser}
2149   @return: a serializable config file containing the
2150       export info
2151
2152   """
2153   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2154
2155   config = objects.SerializableConfigParser()
2156   config.read(cff)
2157
2158   if (not config.has_section(constants.INISECT_EXP) or
2159       not config.has_section(constants.INISECT_INS)):
2160     _Fail("Export info file doesn't have the required fields")
2161
2162   return config.Dumps()
2163
2164
2165 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2166   """Import an os image into an instance.
2167
2168   @type instance: L{objects.Instance}
2169   @param instance: instance to import the disks into
2170   @type src_node: string
2171   @param src_node: source node for the disk images
2172   @type src_images: list of string
2173   @param src_images: absolute paths of the disk images
2174   @type debug: integer
2175   @param debug: debug level, passed to the OS scripts
2176   @rtype: list of boolean
2177   @return: each boolean represent the success of importing the n-th disk
2178
2179   """
2180   inst_os = OSFromDisk(instance.os)
2181   import_env = OSEnvironment(instance, inst_os, debug)
2182   import_script = inst_os.import_script
2183
2184   logfile = _InstanceLogName("import", instance.os, instance.name)
2185
2186   comprcmd = "gunzip"
2187   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2188                                import_script, logfile)
2189
2190   final_result = []
2191   for idx, image in enumerate(src_images):
2192     if image:
2193       destcmd = utils.BuildShellCmd('cat %s', image)
2194       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2195                                                        constants.GANETI_RUNAS,
2196                                                        destcmd)
2197       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2198       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2199       import_env['IMPORT_INDEX'] = str(idx)
2200       result = utils.RunCmd(command, env=import_env)
2201       if result.failed:
2202         logging.error("Disk import command '%s' returned error: %s"
2203                       " output: %s", command, result.fail_reason,
2204                       result.output)
2205         final_result.append("error importing disk %d: %s, %s" %
2206                             (idx, result.fail_reason, result.output[-100]))
2207
2208   if final_result:
2209     _Fail("; ".join(final_result), log=False)
2210
2211
2212 def ListExports():
2213   """Return a list of exports currently available on this machine.
2214
2215   @rtype: list
2216   @return: list of the exports
2217
2218   """
2219   if os.path.isdir(constants.EXPORT_DIR):
2220     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2221   else:
2222     _Fail("No exports directory")
2223
2224
2225 def RemoveExport(export):
2226   """Remove an existing export from the node.
2227
2228   @type export: str
2229   @param export: the name of the export to remove
2230   @rtype: None
2231
2232   """
2233   target = utils.PathJoin(constants.EXPORT_DIR, export)
2234
2235   try:
2236     shutil.rmtree(target)
2237   except EnvironmentError, err:
2238     _Fail("Error while removing the export: %s", err, exc=True)
2239
2240
2241 def BlockdevRename(devlist):
2242   """Rename a list of block devices.
2243
2244   @type devlist: list of tuples
2245   @param devlist: list of tuples of the form  (disk,
2246       new_logical_id, new_physical_id); disk is an
2247       L{objects.Disk} object describing the current disk,
2248       and new logical_id/physical_id is the name we
2249       rename it to
2250   @rtype: boolean
2251   @return: True if all renames succeeded, False otherwise
2252
2253   """
2254   msgs = []
2255   result = True
2256   for disk, unique_id in devlist:
2257     dev = _RecursiveFindBD(disk)
2258     if dev is None:
2259       msgs.append("Can't find device %s in rename" % str(disk))
2260       result = False
2261       continue
2262     try:
2263       old_rpath = dev.dev_path
2264       dev.Rename(unique_id)
2265       new_rpath = dev.dev_path
2266       if old_rpath != new_rpath:
2267         DevCacheManager.RemoveCache(old_rpath)
2268         # FIXME: we should add the new cache information here, like:
2269         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2270         # but we don't have the owner here - maybe parse from existing
2271         # cache? for now, we only lose lvm data when we rename, which
2272         # is less critical than DRBD or MD
2273     except errors.BlockDeviceError, err:
2274       msgs.append("Can't rename device '%s' to '%s': %s" %
2275                   (dev, unique_id, err))
2276       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2277       result = False
2278   if not result:
2279     _Fail("; ".join(msgs))
2280
2281
2282 def _TransformFileStorageDir(file_storage_dir):
2283   """Checks whether given file_storage_dir is valid.
2284
2285   Checks wheter the given file_storage_dir is within the cluster-wide
2286   default file_storage_dir stored in SimpleStore. Only paths under that
2287   directory are allowed.
2288
2289   @type file_storage_dir: str
2290   @param file_storage_dir: the path to check
2291
2292   @return: the normalized path if valid, None otherwise
2293
2294   """
2295   if not constants.ENABLE_FILE_STORAGE:
2296     _Fail("File storage disabled at configure time")
2297   cfg = _GetConfig()
2298   file_storage_dir = os.path.normpath(file_storage_dir)
2299   base_file_storage_dir = cfg.GetFileStorageDir()
2300   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2301       base_file_storage_dir):
2302     _Fail("File storage directory '%s' is not under base file"
2303           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2304   return file_storage_dir
2305
2306
2307 def CreateFileStorageDir(file_storage_dir):
2308   """Create file storage directory.
2309
2310   @type file_storage_dir: str
2311   @param file_storage_dir: directory to create
2312
2313   @rtype: tuple
2314   @return: tuple with first element a boolean indicating wheter dir
2315       creation was successful or not
2316
2317   """
2318   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2319   if os.path.exists(file_storage_dir):
2320     if not os.path.isdir(file_storage_dir):
2321       _Fail("Specified storage dir '%s' is not a directory",
2322             file_storage_dir)
2323   else:
2324     try:
2325       os.makedirs(file_storage_dir, 0750)
2326     except OSError, err:
2327       _Fail("Cannot create file storage directory '%s': %s",
2328             file_storage_dir, err, exc=True)
2329
2330
2331 def RemoveFileStorageDir(file_storage_dir):
2332   """Remove file storage directory.
2333
2334   Remove it only if it's empty. If not log an error and return.
2335
2336   @type file_storage_dir: str
2337   @param file_storage_dir: the directory we should cleanup
2338   @rtype: tuple (success,)
2339   @return: tuple of one element, C{success}, denoting
2340       whether the operation was successful
2341
2342   """
2343   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2344   if os.path.exists(file_storage_dir):
2345     if not os.path.isdir(file_storage_dir):
2346       _Fail("Specified Storage directory '%s' is not a directory",
2347             file_storage_dir)
2348     # deletes dir only if empty, otherwise we want to fail the rpc call
2349     try:
2350       os.rmdir(file_storage_dir)
2351     except OSError, err:
2352       _Fail("Cannot remove file storage directory '%s': %s",
2353             file_storage_dir, err)
2354
2355
2356 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2357   """Rename the file storage directory.
2358
2359   @type old_file_storage_dir: str
2360   @param old_file_storage_dir: the current path
2361   @type new_file_storage_dir: str
2362   @param new_file_storage_dir: the name we should rename to
2363   @rtype: tuple (success,)
2364   @return: tuple of one element, C{success}, denoting
2365       whether the operation was successful
2366
2367   """
2368   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2369   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2370   if not os.path.exists(new_file_storage_dir):
2371     if os.path.isdir(old_file_storage_dir):
2372       try:
2373         os.rename(old_file_storage_dir, new_file_storage_dir)
2374       except OSError, err:
2375         _Fail("Cannot rename '%s' to '%s': %s",
2376               old_file_storage_dir, new_file_storage_dir, err)
2377     else:
2378       _Fail("Specified storage dir '%s' is not a directory",
2379             old_file_storage_dir)
2380   else:
2381     if os.path.exists(old_file_storage_dir):
2382       _Fail("Cannot rename '%s' to '%s': both locations exist",
2383             old_file_storage_dir, new_file_storage_dir)
2384
2385
2386 def _EnsureJobQueueFile(file_name):
2387   """Checks whether the given filename is in the queue directory.
2388
2389   @type file_name: str
2390   @param file_name: the file name we should check
2391   @rtype: None
2392   @raises RPCFail: if the file is not valid
2393
2394   """
2395   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2396   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2397
2398   if not result:
2399     _Fail("Passed job queue file '%s' does not belong to"
2400           " the queue directory '%s'", file_name, queue_dir)
2401
2402
2403 def JobQueueUpdate(file_name, content):
2404   """Updates a file in the queue directory.
2405
2406   This is just a wrapper over L{utils.WriteFile}, with proper
2407   checking.
2408
2409   @type file_name: str
2410   @param file_name: the job file name
2411   @type content: str
2412   @param content: the new job contents
2413   @rtype: boolean
2414   @return: the success of the operation
2415
2416   """
2417   _EnsureJobQueueFile(file_name)
2418
2419   # Write and replace the file atomically
2420   utils.WriteFile(file_name, data=_Decompress(content))
2421
2422
2423 def JobQueueRename(old, new):
2424   """Renames a job queue file.
2425
2426   This is just a wrapper over os.rename with proper checking.
2427
2428   @type old: str
2429   @param old: the old (actual) file name
2430   @type new: str
2431   @param new: the desired file name
2432   @rtype: tuple
2433   @return: the success of the operation and payload
2434
2435   """
2436   _EnsureJobQueueFile(old)
2437   _EnsureJobQueueFile(new)
2438
2439   utils.RenameFile(old, new, mkdir=True)
2440
2441
2442 def JobQueueSetDrainFlag(drain_flag):
2443   """Set the drain flag for the queue.
2444
2445   This will set or unset the queue drain flag.
2446
2447   @type drain_flag: boolean
2448   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2449   @rtype: truple
2450   @return: always True, None
2451   @warning: the function always returns True
2452
2453   """
2454   if drain_flag:
2455     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2456   else:
2457     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2458
2459
2460 def BlockdevClose(instance_name, disks):
2461   """Closes the given block devices.
2462
2463   This means they will be switched to secondary mode (in case of
2464   DRBD).
2465
2466   @param instance_name: if the argument is not empty, the symlinks
2467       of this instance will be removed
2468   @type disks: list of L{objects.Disk}
2469   @param disks: the list of disks to be closed
2470   @rtype: tuple (success, message)
2471   @return: a tuple of success and message, where success
2472       indicates the succes of the operation, and message
2473       which will contain the error details in case we
2474       failed
2475
2476   """
2477   bdevs = []
2478   for cf in disks:
2479     rd = _RecursiveFindBD(cf)
2480     if rd is None:
2481       _Fail("Can't find device %s", cf)
2482     bdevs.append(rd)
2483
2484   msg = []
2485   for rd in bdevs:
2486     try:
2487       rd.Close()
2488     except errors.BlockDeviceError, err:
2489       msg.append(str(err))
2490   if msg:
2491     _Fail("Can't make devices secondary: %s", ",".join(msg))
2492   else:
2493     if instance_name:
2494       _RemoveBlockDevLinks(instance_name, disks)
2495
2496
2497 def ValidateHVParams(hvname, hvparams):
2498   """Validates the given hypervisor parameters.
2499
2500   @type hvname: string
2501   @param hvname: the hypervisor name
2502   @type hvparams: dict
2503   @param hvparams: the hypervisor parameters to be validated
2504   @rtype: None
2505
2506   """
2507   try:
2508     hv_type = hypervisor.GetHypervisor(hvname)
2509     hv_type.ValidateParameters(hvparams)
2510   except errors.HypervisorError, err:
2511     _Fail(str(err), log=False)
2512
2513
2514 def DemoteFromMC():
2515   """Demotes the current node from master candidate role.
2516
2517   """
2518   # try to ensure we're not the master by mistake
2519   master, myself = ssconf.GetMasterAndMyself()
2520   if master == myself:
2521     _Fail("ssconf status shows I'm the master node, will not demote")
2522
2523   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2524   if not result.failed:
2525     _Fail("The master daemon is running, will not demote")
2526
2527   try:
2528     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2529       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2530   except EnvironmentError, err:
2531     if err.errno != errno.ENOENT:
2532       _Fail("Error while backing up cluster file: %s", err, exc=True)
2533
2534   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2535
2536
2537 def _FindDisks(nodes_ip, disks):
2538   """Sets the physical ID on disks and returns the block devices.
2539
2540   """
2541   # set the correct physical ID
2542   my_name = utils.HostInfo().name
2543   for cf in disks:
2544     cf.SetPhysicalID(my_name, nodes_ip)
2545
2546   bdevs = []
2547
2548   for cf in disks:
2549     rd = _RecursiveFindBD(cf)
2550     if rd is None:
2551       _Fail("Can't find device %s", cf)
2552     bdevs.append(rd)
2553   return bdevs
2554
2555
2556 def DrbdDisconnectNet(nodes_ip, disks):
2557   """Disconnects the network on a list of drbd devices.
2558
2559   """
2560   bdevs = _FindDisks(nodes_ip, disks)
2561
2562   # disconnect disks
2563   for rd in bdevs:
2564     try:
2565       rd.DisconnectNet()
2566     except errors.BlockDeviceError, err:
2567       _Fail("Can't change network configuration to standalone mode: %s",
2568             err, exc=True)
2569
2570
2571 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2572   """Attaches the network on a list of drbd devices.
2573
2574   """
2575   bdevs = _FindDisks(nodes_ip, disks)
2576
2577   if multimaster:
2578     for idx, rd in enumerate(bdevs):
2579       try:
2580         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2581       except EnvironmentError, err:
2582         _Fail("Can't create symlink: %s", err)
2583   # reconnect disks, switch to new master configuration and if
2584   # needed primary mode
2585   for rd in bdevs:
2586     try:
2587       rd.AttachNet(multimaster)
2588     except errors.BlockDeviceError, err:
2589       _Fail("Can't change network configuration: %s", err)
2590
2591   # wait until the disks are connected; we need to retry the re-attach
2592   # if the device becomes standalone, as this might happen if the one
2593   # node disconnects and reconnects in a different mode before the
2594   # other node reconnects; in this case, one or both of the nodes will
2595   # decide it has wrong configuration and switch to standalone
2596
2597   def _Attach():
2598     all_connected = True
2599
2600     for rd in bdevs:
2601       stats = rd.GetProcStatus()
2602
2603       all_connected = (all_connected and
2604                        (stats.is_connected or stats.is_in_resync))
2605
2606       if stats.is_standalone:
2607         # peer had different config info and this node became
2608         # standalone, even though this should not happen with the
2609         # new staged way of changing disk configs
2610         try:
2611           rd.AttachNet(multimaster)
2612         except errors.BlockDeviceError, err:
2613           _Fail("Can't change network configuration: %s", err)
2614
2615     if not all_connected:
2616       raise utils.RetryAgain()
2617
2618   try:
2619     # Start with a delay of 100 miliseconds and go up to 5 seconds
2620     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2621   except utils.RetryTimeout:
2622     _Fail("Timeout in disk reconnecting")
2623
2624   if multimaster:
2625     # change to primary mode
2626     for rd in bdevs:
2627       try:
2628         rd.Open()
2629       except errors.BlockDeviceError, err:
2630         _Fail("Can't change to primary mode: %s", err)
2631
2632
2633 def DrbdWaitSync(nodes_ip, disks):
2634   """Wait until DRBDs have synchronized.
2635
2636   """
2637   def _helper(rd):
2638     stats = rd.GetProcStatus()
2639     if not (stats.is_connected or stats.is_in_resync):
2640       raise utils.RetryAgain()
2641     return stats
2642
2643   bdevs = _FindDisks(nodes_ip, disks)
2644
2645   min_resync = 100
2646   alldone = True
2647   for rd in bdevs:
2648     try:
2649       # poll each second for 15 seconds
2650       stats = utils.Retry(_helper, 1, 15, args=[rd])
2651     except utils.RetryTimeout:
2652       stats = rd.GetProcStatus()
2653       # last check
2654       if not (stats.is_connected or stats.is_in_resync):
2655         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2656     alldone = alldone and (not stats.is_in_resync)
2657     if stats.sync_percent is not None:
2658       min_resync = min(min_resync, stats.sync_percent)
2659
2660   return (alldone, min_resync)
2661
2662
2663 def PowercycleNode(hypervisor_type):
2664   """Hard-powercycle the node.
2665
2666   Because we need to return first, and schedule the powercycle in the
2667   background, we won't be able to report failures nicely.
2668
2669   """
2670   hyper = hypervisor.GetHypervisor(hypervisor_type)
2671   try:
2672     pid = os.fork()
2673   except OSError:
2674     # if we can't fork, we'll pretend that we're in the child process
2675     pid = 0
2676   if pid > 0:
2677     return "Reboot scheduled in 5 seconds"
2678   # ensure the child is running on ram
2679   try:
2680     utils.Mlockall()
2681   except Exception:
2682     pass
2683   time.sleep(5)
2684   hyper.PowercycleNode()
2685
2686
2687 class HooksRunner(object):
2688   """Hook runner.
2689
2690   This class is instantiated on the node side (ganeti-noded) and not
2691   on the master side.
2692
2693   """
2694   def __init__(self, hooks_base_dir=None):
2695     """Constructor for hooks runner.
2696
2697     @type hooks_base_dir: str or None
2698     @param hooks_base_dir: if not None, this overrides the
2699         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2700
2701     """
2702     if hooks_base_dir is None:
2703       hooks_base_dir = constants.HOOKS_BASE_DIR
2704     # yeah, _BASE_DIR is not valid for attributes, we use it like a
2705     # constant
2706     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2707
2708   def RunHooks(self, hpath, phase, env):
2709     """Run the scripts in the hooks directory.
2710
2711     @type hpath: str
2712     @param hpath: the path to the hooks directory which
2713         holds the scripts
2714     @type phase: str
2715     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2716         L{constants.HOOKS_PHASE_POST}
2717     @type env: dict
2718     @param env: dictionary with the environment for the hook
2719     @rtype: list
2720     @return: list of 3-element tuples:
2721       - script path
2722       - script result, either L{constants.HKR_SUCCESS} or
2723         L{constants.HKR_FAIL}
2724       - output of the script
2725
2726     @raise errors.ProgrammerError: for invalid input
2727         parameters
2728
2729     """
2730     if phase == constants.HOOKS_PHASE_PRE:
2731       suffix = "pre"
2732     elif phase == constants.HOOKS_PHASE_POST:
2733       suffix = "post"
2734     else:
2735       _Fail("Unknown hooks phase '%s'", phase)
2736
2737
2738     subdir = "%s-%s.d" % (hpath, suffix)
2739     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2740
2741     results = []
2742
2743     if not os.path.isdir(dir_name):
2744       # for non-existing/non-dirs, we simply exit instead of logging a
2745       # warning at every operation
2746       return results
2747
2748     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2749
2750     for (relname, relstatus, runresult)  in runparts_results:
2751       if relstatus == constants.RUNPARTS_SKIP:
2752         rrval = constants.HKR_SKIP
2753         output = ""
2754       elif relstatus == constants.RUNPARTS_ERR:
2755         rrval = constants.HKR_FAIL
2756         output = "Hook script execution error: %s" % runresult
2757       elif relstatus == constants.RUNPARTS_RUN:
2758         if runresult.failed:
2759           rrval = constants.HKR_FAIL
2760         else:
2761           rrval = constants.HKR_SUCCESS
2762         output = utils.SafeEncode(runresult.output.strip())
2763       results.append(("%s/%s" % (subdir, relname), rrval, output))
2764
2765     return results
2766
2767
2768 class IAllocatorRunner(object):
2769   """IAllocator runner.
2770
2771   This class is instantiated on the node side (ganeti-noded) and not on
2772   the master side.
2773
2774   """
2775   @staticmethod
2776   def Run(name, idata):
2777     """Run an iallocator script.
2778
2779     @type name: str
2780     @param name: the iallocator script name
2781     @type idata: str
2782     @param idata: the allocator input data
2783
2784     @rtype: tuple
2785     @return: two element tuple of:
2786        - status
2787        - either error message or stdout of allocator (for success)
2788
2789     """
2790     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2791                                   os.path.isfile)
2792     if alloc_script is None:
2793       _Fail("iallocator module '%s' not found in the search path", name)
2794
2795     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2796     try:
2797       os.write(fd, idata)
2798       os.close(fd)
2799       result = utils.RunCmd([alloc_script, fin_name])
2800       if result.failed:
2801         _Fail("iallocator module '%s' failed: %s, output '%s'",
2802               name, result.fail_reason, result.output)
2803     finally:
2804       os.unlink(fin_name)
2805
2806     return result.stdout
2807
2808
2809 class DevCacheManager(object):
2810   """Simple class for managing a cache of block device information.
2811
2812   """
2813   _DEV_PREFIX = "/dev/"
2814   _ROOT_DIR = constants.BDEV_CACHE_DIR
2815
2816   @classmethod
2817   def _ConvertPath(cls, dev_path):
2818     """Converts a /dev/name path to the cache file name.
2819
2820     This replaces slashes with underscores and strips the /dev
2821     prefix. It then returns the full path to the cache file.
2822
2823     @type dev_path: str
2824     @param dev_path: the C{/dev/} path name
2825     @rtype: str
2826     @return: the converted path name
2827
2828     """
2829     if dev_path.startswith(cls._DEV_PREFIX):
2830       dev_path = dev_path[len(cls._DEV_PREFIX):]
2831     dev_path = dev_path.replace("/", "_")
2832     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2833     return fpath
2834
2835   @classmethod
2836   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2837     """Updates the cache information for a given device.
2838
2839     @type dev_path: str
2840     @param dev_path: the pathname of the device
2841     @type owner: str
2842     @param owner: the owner (instance name) of the device
2843     @type on_primary: bool
2844     @param on_primary: whether this is the primary
2845         node nor not
2846     @type iv_name: str
2847     @param iv_name: the instance-visible name of the
2848         device, as in objects.Disk.iv_name
2849
2850     @rtype: None
2851
2852     """
2853     if dev_path is None:
2854       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2855       return
2856     fpath = cls._ConvertPath(dev_path)
2857     if on_primary:
2858       state = "primary"
2859     else:
2860       state = "secondary"
2861     if iv_name is None:
2862       iv_name = "not_visible"
2863     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2864     try:
2865       utils.WriteFile(fpath, data=fdata)
2866     except EnvironmentError, err:
2867       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2868
2869   @classmethod
2870   def RemoveCache(cls, dev_path):
2871     """Remove data for a dev_path.
2872
2873     This is just a wrapper over L{utils.RemoveFile} with a converted
2874     path name and logging.
2875
2876     @type dev_path: str
2877     @param dev_path: the pathname of the device
2878
2879     @rtype: None
2880
2881     """
2882     if dev_path is None:
2883       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2884       return
2885     fpath = cls._ConvertPath(dev_path)
2886     try:
2887       utils.RemoveFile(fpath)
2888     except EnvironmentError, err:
2889       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)