hypervisors: change MigrateInstance API
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster(modify_ssh_setup):
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   @param modify_ssh_setup: boolean
365
366   """
367   _CleanDirectory(constants.DATA_DIR)
368   JobQueuePurge()
369
370   if modify_ssh_setup:
371     try:
372       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
373
374       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
375
376       utils.RemoveFile(priv_key)
377       utils.RemoveFile(pub_key)
378     except errors.OpExecError:
379       logging.exception("Error while processing ssh files")
380
381   try:
382     utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
383     utils.RemoveFile(constants.RAPI_CERT_FILE)
384     utils.RemoveFile(constants.SSL_CERT_FILE)
385   except:
386     logging.exception("Error while removing cluster secrets")
387
388   confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
389
390   if confd_pid:
391     utils.KillProcess(confd_pid, timeout=2)
392
393   # Raise a custom exception (handled in ganeti-noded)
394   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
395
396
397 def GetNodeInfo(vgname, hypervisor_type):
398   """Gives back a hash with different information about the node.
399
400   @type vgname: C{string}
401   @param vgname: the name of the volume group to ask for disk space information
402   @type hypervisor_type: C{str}
403   @param hypervisor_type: the name of the hypervisor to ask for
404       memory information
405   @rtype: C{dict}
406   @return: dictionary with the following keys:
407       - vg_size is the size of the configured volume group in MiB
408       - vg_free is the free size of the volume group in MiB
409       - memory_dom0 is the memory allocated for domain0 in MiB
410       - memory_free is the currently available (free) ram in MiB
411       - memory_total is the total number of ram in MiB
412
413   """
414   outputarray = {}
415   vginfo = _GetVGInfo(vgname)
416   outputarray['vg_size'] = vginfo['vg_size']
417   outputarray['vg_free'] = vginfo['vg_free']
418
419   hyper = hypervisor.GetHypervisor(hypervisor_type)
420   hyp_info = hyper.GetNodeInfo()
421   if hyp_info is not None:
422     outputarray.update(hyp_info)
423
424   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
425
426   return outputarray
427
428
429 def VerifyNode(what, cluster_name):
430   """Verify the status of the local node.
431
432   Based on the input L{what} parameter, various checks are done on the
433   local node.
434
435   If the I{filelist} key is present, this list of
436   files is checksummed and the file/checksum pairs are returned.
437
438   If the I{nodelist} key is present, we check that we have
439   connectivity via ssh with the target nodes (and check the hostname
440   report).
441
442   If the I{node-net-test} key is present, we check that we have
443   connectivity to the given nodes via both primary IP and, if
444   applicable, secondary IPs.
445
446   @type what: C{dict}
447   @param what: a dictionary of things to check:
448       - filelist: list of files for which to compute checksums
449       - nodelist: list of nodes we should check ssh communication with
450       - node-net-test: list of nodes we should check node daemon port
451         connectivity with
452       - hypervisor: list with hypervisors to run the verify for
453   @rtype: dict
454   @return: a dictionary with the same keys as the input dict, and
455       values representing the result of the checks
456
457   """
458   result = {}
459
460   if constants.NV_HYPERVISOR in what:
461     result[constants.NV_HYPERVISOR] = tmp = {}
462     for hv_name in what[constants.NV_HYPERVISOR]:
463       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
464
465   if constants.NV_FILELIST in what:
466     result[constants.NV_FILELIST] = utils.FingerprintFiles(
467       what[constants.NV_FILELIST])
468
469   if constants.NV_NODELIST in what:
470     result[constants.NV_NODELIST] = tmp = {}
471     random.shuffle(what[constants.NV_NODELIST])
472     for node in what[constants.NV_NODELIST]:
473       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
474       if not success:
475         tmp[node] = message
476
477   if constants.NV_NODENETTEST in what:
478     result[constants.NV_NODENETTEST] = tmp = {}
479     my_name = utils.HostInfo().name
480     my_pip = my_sip = None
481     for name, pip, sip in what[constants.NV_NODENETTEST]:
482       if name == my_name:
483         my_pip = pip
484         my_sip = sip
485         break
486     if not my_pip:
487       tmp[my_name] = ("Can't find my own primary/secondary IP"
488                       " in the node list")
489     else:
490       port = utils.GetDaemonPort(constants.NODED)
491       for name, pip, sip in what[constants.NV_NODENETTEST]:
492         fail = []
493         if not utils.TcpPing(pip, port, source=my_pip):
494           fail.append("primary")
495         if sip != pip:
496           if not utils.TcpPing(sip, port, source=my_sip):
497             fail.append("secondary")
498         if fail:
499           tmp[name] = ("failure using the %s interface(s)" %
500                        " and ".join(fail))
501
502   if constants.NV_LVLIST in what:
503     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
504
505   if constants.NV_INSTANCELIST in what:
506     result[constants.NV_INSTANCELIST] = GetInstanceList(
507       what[constants.NV_INSTANCELIST])
508
509   if constants.NV_VGLIST in what:
510     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
511
512   if constants.NV_PVLIST in what:
513     result[constants.NV_PVLIST] = \
514       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
515                                    filter_allocatable=False)
516
517   if constants.NV_VERSION in what:
518     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
519                                     constants.RELEASE_VERSION)
520
521   if constants.NV_HVINFO in what:
522     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
523     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
524
525   if constants.NV_DRBDLIST in what:
526     try:
527       used_minors = bdev.DRBD8.GetUsedDevs().keys()
528     except errors.BlockDeviceError, err:
529       logging.warning("Can't get used minors list", exc_info=True)
530       used_minors = str(err)
531     result[constants.NV_DRBDLIST] = used_minors
532
533   if constants.NV_NODESETUP in what:
534     result[constants.NV_NODESETUP] = tmpr = []
535     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
536       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
537                   " under /sys, missing required directories /sys/block"
538                   " and /sys/class/net")
539     if (not os.path.isdir("/proc/sys") or
540         not os.path.isfile("/proc/sysrq-trigger")):
541       tmpr.append("The procfs filesystem doesn't seem to be mounted"
542                   " under /proc, missing required directory /proc/sys and"
543                   " the file /proc/sysrq-trigger")
544   return result
545
546
547 def GetVolumeList(vg_name):
548   """Compute list of logical volumes and their size.
549
550   @type vg_name: str
551   @param vg_name: the volume group whose LVs we should list
552   @rtype: dict
553   @return:
554       dictionary of all partions (key) with value being a tuple of
555       their size (in MiB), inactive and online status::
556
557         {'test1': ('20.06', True, True)}
558
559       in case of errors, a string is returned with the error
560       details.
561
562   """
563   lvs = {}
564   sep = '|'
565   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
566                          "--separator=%s" % sep,
567                          "-olv_name,lv_size,lv_attr", vg_name])
568   if result.failed:
569     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
570
571   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
572   for line in result.stdout.splitlines():
573     line = line.strip()
574     match = valid_line_re.match(line)
575     if not match:
576       logging.error("Invalid line returned from lvs output: '%s'", line)
577       continue
578     name, size, attr = match.groups()
579     inactive = attr[4] == '-'
580     online = attr[5] == 'o'
581     virtual = attr[0] == 'v'
582     if virtual:
583       # we don't want to report such volumes as existing, since they
584       # don't really hold data
585       continue
586     lvs[name] = (size, inactive, online)
587
588   return lvs
589
590
591 def ListVolumeGroups():
592   """List the volume groups and their size.
593
594   @rtype: dict
595   @return: dictionary with keys volume name and values the
596       size of the volume
597
598   """
599   return utils.ListVolumeGroups()
600
601
602 def NodeVolumes():
603   """List all volumes on this node.
604
605   @rtype: list
606   @return:
607     A list of dictionaries, each having four keys:
608       - name: the logical volume name,
609       - size: the size of the logical volume
610       - dev: the physical device on which the LV lives
611       - vg: the volume group to which it belongs
612
613     In case of errors, we return an empty list and log the
614     error.
615
616     Note that since a logical volume can live on multiple physical
617     volumes, the resulting list might include a logical volume
618     multiple times.
619
620   """
621   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
622                          "--separator=|",
623                          "--options=lv_name,lv_size,devices,vg_name"])
624   if result.failed:
625     _Fail("Failed to list logical volumes, lvs output: %s",
626           result.output)
627
628   def parse_dev(dev):
629     if '(' in dev:
630       return dev.split('(')[0]
631     else:
632       return dev
633
634   def map_line(line):
635     return {
636       'name': line[0].strip(),
637       'size': line[1].strip(),
638       'dev': parse_dev(line[2].strip()),
639       'vg': line[3].strip(),
640     }
641
642   return [map_line(line.split('|')) for line in result.stdout.splitlines()
643           if line.count('|') >= 3]
644
645
646 def BridgesExist(bridges_list):
647   """Check if a list of bridges exist on the current node.
648
649   @rtype: boolean
650   @return: C{True} if all of them exist, C{False} otherwise
651
652   """
653   missing = []
654   for bridge in bridges_list:
655     if not utils.BridgeExists(bridge):
656       missing.append(bridge)
657
658   if missing:
659     _Fail("Missing bridges %s", ", ".join(missing))
660
661
662 def GetInstanceList(hypervisor_list):
663   """Provides a list of instances.
664
665   @type hypervisor_list: list
666   @param hypervisor_list: the list of hypervisors to query information
667
668   @rtype: list
669   @return: a list of all running instances on the current node
670     - instance1.example.com
671     - instance2.example.com
672
673   """
674   results = []
675   for hname in hypervisor_list:
676     try:
677       names = hypervisor.GetHypervisor(hname).ListInstances()
678       results.extend(names)
679     except errors.HypervisorError, err:
680       _Fail("Error enumerating instances (hypervisor %s): %s",
681             hname, err, exc=True)
682
683   return results
684
685
686 def GetInstanceInfo(instance, hname):
687   """Gives back the information about an instance as a dictionary.
688
689   @type instance: string
690   @param instance: the instance name
691   @type hname: string
692   @param hname: the hypervisor type of the instance
693
694   @rtype: dict
695   @return: dictionary with the following keys:
696       - memory: memory size of instance (int)
697       - state: xen state of instance (string)
698       - time: cpu time of instance (float)
699
700   """
701   output = {}
702
703   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
704   if iinfo is not None:
705     output['memory'] = iinfo[2]
706     output['state'] = iinfo[4]
707     output['time'] = iinfo[5]
708
709   return output
710
711
712 def GetInstanceMigratable(instance):
713   """Gives whether an instance can be migrated.
714
715   @type instance: L{objects.Instance}
716   @param instance: object representing the instance to be checked.
717
718   @rtype: tuple
719   @return: tuple of (result, description) where:
720       - result: whether the instance can be migrated or not
721       - description: a description of the issue, if relevant
722
723   """
724   hyper = hypervisor.GetHypervisor(instance.hypervisor)
725   iname = instance.name
726   if iname not in hyper.ListInstances():
727     _Fail("Instance %s is not running", iname)
728
729   for idx in range(len(instance.disks)):
730     link_name = _GetBlockDevSymlinkPath(iname, idx)
731     if not os.path.islink(link_name):
732       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
733
734
735 def GetAllInstancesInfo(hypervisor_list):
736   """Gather data about all instances.
737
738   This is the equivalent of L{GetInstanceInfo}, except that it
739   computes data for all instances at once, thus being faster if one
740   needs data about more than one instance.
741
742   @type hypervisor_list: list
743   @param hypervisor_list: list of hypervisors to query for instance data
744
745   @rtype: dict
746   @return: dictionary of instance: data, with data having the following keys:
747       - memory: memory size of instance (int)
748       - state: xen state of instance (string)
749       - time: cpu time of instance (float)
750       - vcpus: the number of vcpus
751
752   """
753   output = {}
754
755   for hname in hypervisor_list:
756     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
757     if iinfo:
758       for name, _, memory, vcpus, state, times in iinfo:
759         value = {
760           'memory': memory,
761           'vcpus': vcpus,
762           'state': state,
763           'time': times,
764           }
765         if name in output:
766           # we only check static parameters, like memory and vcpus,
767           # and not state and time which can change between the
768           # invocations of the different hypervisors
769           for key in 'memory', 'vcpus':
770             if value[key] != output[name][key]:
771               _Fail("Instance %s is running twice"
772                     " with different parameters", name)
773         output[name] = value
774
775   return output
776
777
778 def InstanceOsAdd(instance, reinstall):
779   """Add an OS to an instance.
780
781   @type instance: L{objects.Instance}
782   @param instance: Instance whose OS is to be installed
783   @type reinstall: boolean
784   @param reinstall: whether this is an instance reinstall
785   @rtype: None
786
787   """
788   inst_os = OSFromDisk(instance.os)
789
790   create_env = OSEnvironment(instance, inst_os)
791   if reinstall:
792     create_env['INSTANCE_REINSTALL'] = "1"
793
794   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
795                                      instance.name, int(time.time()))
796
797   result = utils.RunCmd([inst_os.create_script], env=create_env,
798                         cwd=inst_os.path, output=logfile,)
799   if result.failed:
800     logging.error("os create command '%s' returned error: %s, logfile: %s,"
801                   " output: %s", result.cmd, result.fail_reason, logfile,
802                   result.output)
803     lines = [utils.SafeEncode(val)
804              for val in utils.TailFile(logfile, lines=20)]
805     _Fail("OS create script failed (%s), last lines in the"
806           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
807
808
809 def RunRenameInstance(instance, old_name):
810   """Run the OS rename script for an instance.
811
812   @type instance: L{objects.Instance}
813   @param instance: Instance whose OS is to be installed
814   @type old_name: string
815   @param old_name: previous instance name
816   @rtype: boolean
817   @return: the success of the operation
818
819   """
820   inst_os = OSFromDisk(instance.os)
821
822   rename_env = OSEnvironment(instance, inst_os)
823   rename_env['OLD_INSTANCE_NAME'] = old_name
824
825   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
826                                            old_name,
827                                            instance.name, int(time.time()))
828
829   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
830                         cwd=inst_os.path, output=logfile)
831
832   if result.failed:
833     logging.error("os create command '%s' returned error: %s output: %s",
834                   result.cmd, result.fail_reason, result.output)
835     lines = [utils.SafeEncode(val)
836              for val in utils.TailFile(logfile, lines=20)]
837     _Fail("OS rename script failed (%s), last lines in the"
838           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
839
840
841 def _GetVGInfo(vg_name):
842   """Get information about the volume group.
843
844   @type vg_name: str
845   @param vg_name: the volume group which we query
846   @rtype: dict
847   @return:
848     A dictionary with the following keys:
849       - C{vg_size} is the total size of the volume group in MiB
850       - C{vg_free} is the free size of the volume group in MiB
851       - C{pv_count} are the number of physical disks in that VG
852
853     If an error occurs during gathering of data, we return the same dict
854     with keys all set to None.
855
856   """
857   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
858
859   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
860                          "--nosuffix", "--units=m", "--separator=:", vg_name])
861
862   if retval.failed:
863     logging.error("volume group %s not present", vg_name)
864     return retdic
865   valarr = retval.stdout.strip().rstrip(':').split(':')
866   if len(valarr) == 3:
867     try:
868       retdic = {
869         "vg_size": int(round(float(valarr[0]), 0)),
870         "vg_free": int(round(float(valarr[1]), 0)),
871         "pv_count": int(valarr[2]),
872         }
873     except ValueError, err:
874       logging.exception("Fail to parse vgs output: %s", err)
875   else:
876     logging.error("vgs output has the wrong number of fields (expected"
877                   " three): %s", str(valarr))
878   return retdic
879
880
881 def _GetBlockDevSymlinkPath(instance_name, idx):
882   return os.path.join(constants.DISK_LINKS_DIR,
883                       "%s:%d" % (instance_name, idx))
884
885
886 def _SymlinkBlockDev(instance_name, device_path, idx):
887   """Set up symlinks to a instance's block device.
888
889   This is an auxiliary function run when an instance is start (on the primary
890   node) or when an instance is migrated (on the target node).
891
892
893   @param instance_name: the name of the target instance
894   @param device_path: path of the physical block device, on the node
895   @param idx: the disk index
896   @return: absolute path to the disk's symlink
897
898   """
899   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
900   try:
901     os.symlink(device_path, link_name)
902   except OSError, err:
903     if err.errno == errno.EEXIST:
904       if (not os.path.islink(link_name) or
905           os.readlink(link_name) != device_path):
906         os.remove(link_name)
907         os.symlink(device_path, link_name)
908     else:
909       raise
910
911   return link_name
912
913
914 def _RemoveBlockDevLinks(instance_name, disks):
915   """Remove the block device symlinks belonging to the given instance.
916
917   """
918   for idx, _ in enumerate(disks):
919     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
920     if os.path.islink(link_name):
921       try:
922         os.remove(link_name)
923       except OSError:
924         logging.exception("Can't remove symlink '%s'", link_name)
925
926
927 def _GatherAndLinkBlockDevs(instance):
928   """Set up an instance's block device(s).
929
930   This is run on the primary node at instance startup. The block
931   devices must be already assembled.
932
933   @type instance: L{objects.Instance}
934   @param instance: the instance whose disks we shoul assemble
935   @rtype: list
936   @return: list of (disk_object, device_path)
937
938   """
939   block_devices = []
940   for idx, disk in enumerate(instance.disks):
941     device = _RecursiveFindBD(disk)
942     if device is None:
943       raise errors.BlockDeviceError("Block device '%s' is not set up." %
944                                     str(disk))
945     device.Open()
946     try:
947       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
948     except OSError, e:
949       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
950                                     e.strerror)
951
952     block_devices.append((disk, link_name))
953
954   return block_devices
955
956
957 def StartInstance(instance):
958   """Start an instance.
959
960   @type instance: L{objects.Instance}
961   @param instance: the instance object
962   @rtype: None
963
964   """
965   running_instances = GetInstanceList([instance.hypervisor])
966
967   if instance.name in running_instances:
968     logging.info("Instance %s already running, not starting", instance.name)
969     return
970
971   try:
972     block_devices = _GatherAndLinkBlockDevs(instance)
973     hyper = hypervisor.GetHypervisor(instance.hypervisor)
974     hyper.StartInstance(instance, block_devices)
975   except errors.BlockDeviceError, err:
976     _Fail("Block device error: %s", err, exc=True)
977   except errors.HypervisorError, err:
978     _RemoveBlockDevLinks(instance.name, instance.disks)
979     _Fail("Hypervisor error: %s", err, exc=True)
980
981
982 def InstanceShutdown(instance, timeout):
983   """Shut an instance down.
984
985   @note: this functions uses polling with a hardcoded timeout.
986
987   @type instance: L{objects.Instance}
988   @param instance: the instance object
989   @type timeout: integer
990   @param timeout: maximum timeout for soft shutdown
991   @rtype: None
992
993   """
994   hv_name = instance.hypervisor
995   hyper = hypervisor.GetHypervisor(hv_name)
996   iname = instance.name
997
998   if instance.name not in hyper.ListInstances():
999     logging.info("Instance %s not running, doing nothing", iname)
1000     return
1001
1002   class _TryShutdown:
1003     def __init__(self):
1004       self.tried_once = False
1005
1006     def __call__(self):
1007       if iname not in hyper.ListInstances():
1008         return
1009
1010       try:
1011         hyper.StopInstance(instance, retry=self.tried_once)
1012       except errors.HypervisorError, err:
1013         if iname not in hyper.ListInstances():
1014           # if the instance is no longer existing, consider this a
1015           # success and go to cleanup
1016           return
1017
1018         _Fail("Failed to stop instance %s: %s", iname, err)
1019
1020       self.tried_once = True
1021
1022       raise utils.RetryAgain()
1023
1024   try:
1025     utils.Retry(_TryShutdown(), 5, timeout)
1026   except utils.RetryTimeout:
1027     # the shutdown did not succeed
1028     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1029
1030     try:
1031       hyper.StopInstance(instance, force=True)
1032     except errors.HypervisorError, err:
1033       if iname in hyper.ListInstances():
1034         # only raise an error if the instance still exists, otherwise
1035         # the error could simply be "instance ... unknown"!
1036         _Fail("Failed to force stop instance %s: %s", iname, err)
1037
1038     time.sleep(1)
1039
1040     if iname in hyper.ListInstances():
1041       _Fail("Could not shutdown instance %s even by destroy", iname)
1042
1043   _RemoveBlockDevLinks(iname, instance.disks)
1044
1045
1046 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1047   """Reboot an instance.
1048
1049   @type instance: L{objects.Instance}
1050   @param instance: the instance object to reboot
1051   @type reboot_type: str
1052   @param reboot_type: the type of reboot, one the following
1053     constants:
1054       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1055         instance OS, do not recreate the VM
1056       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1057         restart the VM (at the hypervisor level)
1058       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1059         not accepted here, since that mode is handled differently, in
1060         cmdlib, and translates into full stop and start of the
1061         instance (instead of a call_instance_reboot RPC)
1062   @type shutdown_timeout: integer
1063   @param shutdown_timeout: maximum timeout for soft shutdown
1064   @rtype: None
1065
1066   """
1067   running_instances = GetInstanceList([instance.hypervisor])
1068
1069   if instance.name not in running_instances:
1070     _Fail("Cannot reboot instance %s that is not running", instance.name)
1071
1072   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1073   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1074     try:
1075       hyper.RebootInstance(instance)
1076     except errors.HypervisorError, err:
1077       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1078   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1079     try:
1080       InstanceShutdown(instance, shutdown_timeout)
1081       return StartInstance(instance)
1082     except errors.HypervisorError, err:
1083       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1084   else:
1085     _Fail("Invalid reboot_type received: %s", reboot_type)
1086
1087
1088 def MigrationInfo(instance):
1089   """Gather information about an instance to be migrated.
1090
1091   @type instance: L{objects.Instance}
1092   @param instance: the instance definition
1093
1094   """
1095   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1096   try:
1097     info = hyper.MigrationInfo(instance)
1098   except errors.HypervisorError, err:
1099     _Fail("Failed to fetch migration information: %s", err, exc=True)
1100   return info
1101
1102
1103 def AcceptInstance(instance, info, target):
1104   """Prepare the node to accept an instance.
1105
1106   @type instance: L{objects.Instance}
1107   @param instance: the instance definition
1108   @type info: string/data (opaque)
1109   @param info: migration information, from the source node
1110   @type target: string
1111   @param target: target host (usually ip), on this node
1112
1113   """
1114   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1115   try:
1116     hyper.AcceptInstance(instance, info, target)
1117   except errors.HypervisorError, err:
1118     _Fail("Failed to accept instance: %s", err, exc=True)
1119
1120
1121 def FinalizeMigration(instance, info, success):
1122   """Finalize any preparation to accept an instance.
1123
1124   @type instance: L{objects.Instance}
1125   @param instance: the instance definition
1126   @type info: string/data (opaque)
1127   @param info: migration information, from the source node
1128   @type success: boolean
1129   @param success: whether the migration was a success or a failure
1130
1131   """
1132   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1133   try:
1134     hyper.FinalizeMigration(instance, info, success)
1135   except errors.HypervisorError, err:
1136     _Fail("Failed to finalize migration: %s", err, exc=True)
1137
1138
1139 def MigrateInstance(instance, target, live):
1140   """Migrates an instance to another node.
1141
1142   @type instance: L{objects.Instance}
1143   @param instance: the instance definition
1144   @type target: string
1145   @param target: the target node name
1146   @type live: boolean
1147   @param live: whether the migration should be done live or not (the
1148       interpretation of this parameter is left to the hypervisor)
1149   @rtype: tuple
1150   @return: a tuple of (success, msg) where:
1151       - succes is a boolean denoting the success/failure of the operation
1152       - msg is a string with details in case of failure
1153
1154   """
1155   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1156
1157   try:
1158     hyper.MigrateInstance(instance, target, live)
1159   except errors.HypervisorError, err:
1160     _Fail("Failed to migrate instance: %s", err, exc=True)
1161
1162
1163 def BlockdevCreate(disk, size, owner, on_primary, info):
1164   """Creates a block device for an instance.
1165
1166   @type disk: L{objects.Disk}
1167   @param disk: the object describing the disk we should create
1168   @type size: int
1169   @param size: the size of the physical underlying device, in MiB
1170   @type owner: str
1171   @param owner: the name of the instance for which disk is created,
1172       used for device cache data
1173   @type on_primary: boolean
1174   @param on_primary:  indicates if it is the primary node or not
1175   @type info: string
1176   @param info: string that will be sent to the physical device
1177       creation, used for example to set (LVM) tags on LVs
1178
1179   @return: the new unique_id of the device (this can sometime be
1180       computed only after creation), or None. On secondary nodes,
1181       it's not required to return anything.
1182
1183   """
1184   clist = []
1185   if disk.children:
1186     for child in disk.children:
1187       try:
1188         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1189       except errors.BlockDeviceError, err:
1190         _Fail("Can't assemble device %s: %s", child, err)
1191       if on_primary or disk.AssembleOnSecondary():
1192         # we need the children open in case the device itself has to
1193         # be assembled
1194         try:
1195           crdev.Open()
1196         except errors.BlockDeviceError, err:
1197           _Fail("Can't make child '%s' read-write: %s", child, err)
1198       clist.append(crdev)
1199
1200   try:
1201     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1202   except errors.BlockDeviceError, err:
1203     _Fail("Can't create block device: %s", err)
1204
1205   if on_primary or disk.AssembleOnSecondary():
1206     try:
1207       device.Assemble()
1208     except errors.BlockDeviceError, err:
1209       _Fail("Can't assemble device after creation, unusual event: %s", err)
1210     device.SetSyncSpeed(constants.SYNC_SPEED)
1211     if on_primary or disk.OpenOnSecondary():
1212       try:
1213         device.Open(force=True)
1214       except errors.BlockDeviceError, err:
1215         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1216     DevCacheManager.UpdateCache(device.dev_path, owner,
1217                                 on_primary, disk.iv_name)
1218
1219   device.SetInfo(info)
1220
1221   return device.unique_id
1222
1223
1224 def BlockdevRemove(disk):
1225   """Remove a block device.
1226
1227   @note: This is intended to be called recursively.
1228
1229   @type disk: L{objects.Disk}
1230   @param disk: the disk object we should remove
1231   @rtype: boolean
1232   @return: the success of the operation
1233
1234   """
1235   msgs = []
1236   try:
1237     rdev = _RecursiveFindBD(disk)
1238   except errors.BlockDeviceError, err:
1239     # probably can't attach
1240     logging.info("Can't attach to device %s in remove", disk)
1241     rdev = None
1242   if rdev is not None:
1243     r_path = rdev.dev_path
1244     try:
1245       rdev.Remove()
1246     except errors.BlockDeviceError, err:
1247       msgs.append(str(err))
1248     if not msgs:
1249       DevCacheManager.RemoveCache(r_path)
1250
1251   if disk.children:
1252     for child in disk.children:
1253       try:
1254         BlockdevRemove(child)
1255       except RPCFail, err:
1256         msgs.append(str(err))
1257
1258   if msgs:
1259     _Fail("; ".join(msgs))
1260
1261
1262 def _RecursiveAssembleBD(disk, owner, as_primary):
1263   """Activate a block device for an instance.
1264
1265   This is run on the primary and secondary nodes for an instance.
1266
1267   @note: this function is called recursively.
1268
1269   @type disk: L{objects.Disk}
1270   @param disk: the disk we try to assemble
1271   @type owner: str
1272   @param owner: the name of the instance which owns the disk
1273   @type as_primary: boolean
1274   @param as_primary: if we should make the block device
1275       read/write
1276
1277   @return: the assembled device or None (in case no device
1278       was assembled)
1279   @raise errors.BlockDeviceError: in case there is an error
1280       during the activation of the children or the device
1281       itself
1282
1283   """
1284   children = []
1285   if disk.children:
1286     mcn = disk.ChildrenNeeded()
1287     if mcn == -1:
1288       mcn = 0 # max number of Nones allowed
1289     else:
1290       mcn = len(disk.children) - mcn # max number of Nones
1291     for chld_disk in disk.children:
1292       try:
1293         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1294       except errors.BlockDeviceError, err:
1295         if children.count(None) >= mcn:
1296           raise
1297         cdev = None
1298         logging.error("Error in child activation (but continuing): %s",
1299                       str(err))
1300       children.append(cdev)
1301
1302   if as_primary or disk.AssembleOnSecondary():
1303     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1304     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1305     result = r_dev
1306     if as_primary or disk.OpenOnSecondary():
1307       r_dev.Open()
1308     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1309                                 as_primary, disk.iv_name)
1310
1311   else:
1312     result = True
1313   return result
1314
1315
1316 def BlockdevAssemble(disk, owner, as_primary):
1317   """Activate a block device for an instance.
1318
1319   This is a wrapper over _RecursiveAssembleBD.
1320
1321   @rtype: str or boolean
1322   @return: a C{/dev/...} path for primary nodes, and
1323       C{True} for secondary nodes
1324
1325   """
1326   try:
1327     result = _RecursiveAssembleBD(disk, owner, as_primary)
1328     if isinstance(result, bdev.BlockDev):
1329       result = result.dev_path
1330   except errors.BlockDeviceError, err:
1331     _Fail("Error while assembling disk: %s", err, exc=True)
1332
1333   return result
1334
1335
1336 def BlockdevShutdown(disk):
1337   """Shut down a block device.
1338
1339   First, if the device is assembled (Attach() is successful), then
1340   the device is shutdown. Then the children of the device are
1341   shutdown.
1342
1343   This function is called recursively. Note that we don't cache the
1344   children or such, as oppossed to assemble, shutdown of different
1345   devices doesn't require that the upper device was active.
1346
1347   @type disk: L{objects.Disk}
1348   @param disk: the description of the disk we should
1349       shutdown
1350   @rtype: None
1351
1352   """
1353   msgs = []
1354   r_dev = _RecursiveFindBD(disk)
1355   if r_dev is not None:
1356     r_path = r_dev.dev_path
1357     try:
1358       r_dev.Shutdown()
1359       DevCacheManager.RemoveCache(r_path)
1360     except errors.BlockDeviceError, err:
1361       msgs.append(str(err))
1362
1363   if disk.children:
1364     for child in disk.children:
1365       try:
1366         BlockdevShutdown(child)
1367       except RPCFail, err:
1368         msgs.append(str(err))
1369
1370   if msgs:
1371     _Fail("; ".join(msgs))
1372
1373
1374 def BlockdevAddchildren(parent_cdev, new_cdevs):
1375   """Extend a mirrored block device.
1376
1377   @type parent_cdev: L{objects.Disk}
1378   @param parent_cdev: the disk to which we should add children
1379   @type new_cdevs: list of L{objects.Disk}
1380   @param new_cdevs: the list of children which we should add
1381   @rtype: None
1382
1383   """
1384   parent_bdev = _RecursiveFindBD(parent_cdev)
1385   if parent_bdev is None:
1386     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1387   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1388   if new_bdevs.count(None) > 0:
1389     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1390   parent_bdev.AddChildren(new_bdevs)
1391
1392
1393 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1394   """Shrink a mirrored block device.
1395
1396   @type parent_cdev: L{objects.Disk}
1397   @param parent_cdev: the disk from which we should remove children
1398   @type new_cdevs: list of L{objects.Disk}
1399   @param new_cdevs: the list of children which we should remove
1400   @rtype: None
1401
1402   """
1403   parent_bdev = _RecursiveFindBD(parent_cdev)
1404   if parent_bdev is None:
1405     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1406   devs = []
1407   for disk in new_cdevs:
1408     rpath = disk.StaticDevPath()
1409     if rpath is None:
1410       bd = _RecursiveFindBD(disk)
1411       if bd is None:
1412         _Fail("Can't find device %s while removing children", disk)
1413       else:
1414         devs.append(bd.dev_path)
1415     else:
1416       devs.append(rpath)
1417   parent_bdev.RemoveChildren(devs)
1418
1419
1420 def BlockdevGetmirrorstatus(disks):
1421   """Get the mirroring status of a list of devices.
1422
1423   @type disks: list of L{objects.Disk}
1424   @param disks: the list of disks which we should query
1425   @rtype: disk
1426   @return:
1427       a list of (mirror_done, estimated_time) tuples, which
1428       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1429   @raise errors.BlockDeviceError: if any of the disks cannot be
1430       found
1431
1432   """
1433   stats = []
1434   for dsk in disks:
1435     rbd = _RecursiveFindBD(dsk)
1436     if rbd is None:
1437       _Fail("Can't find device %s", dsk)
1438
1439     stats.append(rbd.CombinedSyncStatus())
1440
1441   return stats
1442
1443
1444 def _RecursiveFindBD(disk):
1445   """Check if a device is activated.
1446
1447   If so, return information about the real device.
1448
1449   @type disk: L{objects.Disk}
1450   @param disk: the disk object we need to find
1451
1452   @return: None if the device can't be found,
1453       otherwise the device instance
1454
1455   """
1456   children = []
1457   if disk.children:
1458     for chdisk in disk.children:
1459       children.append(_RecursiveFindBD(chdisk))
1460
1461   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1462
1463
1464 def BlockdevFind(disk):
1465   """Check if a device is activated.
1466
1467   If it is, return information about the real device.
1468
1469   @type disk: L{objects.Disk}
1470   @param disk: the disk to find
1471   @rtype: None or objects.BlockDevStatus
1472   @return: None if the disk cannot be found, otherwise a the current
1473            information
1474
1475   """
1476   try:
1477     rbd = _RecursiveFindBD(disk)
1478   except errors.BlockDeviceError, err:
1479     _Fail("Failed to find device: %s", err, exc=True)
1480
1481   if rbd is None:
1482     return None
1483
1484   return rbd.GetSyncStatus()
1485
1486
1487 def BlockdevGetsize(disks):
1488   """Computes the size of the given disks.
1489
1490   If a disk is not found, returns None instead.
1491
1492   @type disks: list of L{objects.Disk}
1493   @param disks: the list of disk to compute the size for
1494   @rtype: list
1495   @return: list with elements None if the disk cannot be found,
1496       otherwise the size
1497
1498   """
1499   result = []
1500   for cf in disks:
1501     try:
1502       rbd = _RecursiveFindBD(cf)
1503     except errors.BlockDeviceError, err:
1504       result.append(None)
1505       continue
1506     if rbd is None:
1507       result.append(None)
1508     else:
1509       result.append(rbd.GetActualSize())
1510   return result
1511
1512
1513 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1514   """Export a block device to a remote node.
1515
1516   @type disk: L{objects.Disk}
1517   @param disk: the description of the disk to export
1518   @type dest_node: str
1519   @param dest_node: the destination node to export to
1520   @type dest_path: str
1521   @param dest_path: the destination path on the target node
1522   @type cluster_name: str
1523   @param cluster_name: the cluster name, needed for SSH hostalias
1524   @rtype: None
1525
1526   """
1527   real_disk = _RecursiveFindBD(disk)
1528   if real_disk is None:
1529     _Fail("Block device '%s' is not set up", disk)
1530
1531   real_disk.Open()
1532
1533   # the block size on the read dd is 1MiB to match our units
1534   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1535                                "dd if=%s bs=1048576 count=%s",
1536                                real_disk.dev_path, str(disk.size))
1537
1538   # we set here a smaller block size as, due to ssh buffering, more
1539   # than 64-128k will mostly ignored; we use nocreat to fail if the
1540   # device is not already there or we pass a wrong path; we use
1541   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1542   # to not buffer too much memory; this means that at best, we flush
1543   # every 64k, which will not be very fast
1544   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1545                                 " oflag=dsync", dest_path)
1546
1547   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1548                                                    constants.GANETI_RUNAS,
1549                                                    destcmd)
1550
1551   # all commands have been checked, so we're safe to combine them
1552   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1553
1554   result = utils.RunCmd(["bash", "-c", command])
1555
1556   if result.failed:
1557     _Fail("Disk copy command '%s' returned error: %s"
1558           " output: %s", command, result.fail_reason, result.output)
1559
1560
1561 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1562   """Write a file to the filesystem.
1563
1564   This allows the master to overwrite(!) a file. It will only perform
1565   the operation if the file belongs to a list of configuration files.
1566
1567   @type file_name: str
1568   @param file_name: the target file name
1569   @type data: str
1570   @param data: the new contents of the file
1571   @type mode: int
1572   @param mode: the mode to give the file (can be None)
1573   @type uid: int
1574   @param uid: the owner of the file (can be -1 for default)
1575   @type gid: int
1576   @param gid: the group of the file (can be -1 for default)
1577   @type atime: float
1578   @param atime: the atime to set on the file (can be None)
1579   @type mtime: float
1580   @param mtime: the mtime to set on the file (can be None)
1581   @rtype: None
1582
1583   """
1584   if not os.path.isabs(file_name):
1585     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1586
1587   if file_name not in _ALLOWED_UPLOAD_FILES:
1588     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1589           file_name)
1590
1591   raw_data = _Decompress(data)
1592
1593   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1594                   atime=atime, mtime=mtime)
1595
1596
1597 def WriteSsconfFiles(values):
1598   """Update all ssconf files.
1599
1600   Wrapper around the SimpleStore.WriteFiles.
1601
1602   """
1603   ssconf.SimpleStore().WriteFiles(values)
1604
1605
1606 def _ErrnoOrStr(err):
1607   """Format an EnvironmentError exception.
1608
1609   If the L{err} argument has an errno attribute, it will be looked up
1610   and converted into a textual C{E...} description. Otherwise the
1611   string representation of the error will be returned.
1612
1613   @type err: L{EnvironmentError}
1614   @param err: the exception to format
1615
1616   """
1617   if hasattr(err, 'errno'):
1618     detail = errno.errorcode[err.errno]
1619   else:
1620     detail = str(err)
1621   return detail
1622
1623
1624 def _OSOndiskAPIVersion(name, os_dir):
1625   """Compute and return the API version of a given OS.
1626
1627   This function will try to read the API version of the OS given by
1628   the 'name' parameter and residing in the 'os_dir' directory.
1629
1630   @type name: str
1631   @param name: the OS name we should look for
1632   @type os_dir: str
1633   @param os_dir: the directory inwhich we should look for the OS
1634   @rtype: tuple
1635   @return: tuple (status, data) with status denoting the validity and
1636       data holding either the vaid versions or an error message
1637
1638   """
1639   api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1640
1641   try:
1642     st = os.stat(api_file)
1643   except EnvironmentError, err:
1644     return False, ("Required file '%s' not found under path %s: %s" %
1645                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1646
1647   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1648     return False, ("File '%s' in %s is not a regular file" %
1649                    (constants.OS_API_FILE, os_dir))
1650
1651   try:
1652     api_versions = utils.ReadFile(api_file).splitlines()
1653   except EnvironmentError, err:
1654     return False, ("Error while reading the API version file at %s: %s" %
1655                    (api_file, _ErrnoOrStr(err)))
1656
1657   try:
1658     api_versions = [int(version.strip()) for version in api_versions]
1659   except (TypeError, ValueError), err:
1660     return False, ("API version(s) can't be converted to integer: %s" %
1661                    str(err))
1662
1663   return True, api_versions
1664
1665
1666 def DiagnoseOS(top_dirs=None):
1667   """Compute the validity for all OSes.
1668
1669   @type top_dirs: list
1670   @param top_dirs: the list of directories in which to
1671       search (if not given defaults to
1672       L{constants.OS_SEARCH_PATH})
1673   @rtype: list of L{objects.OS}
1674   @return: a list of tuples (name, path, status, diagnose, variants)
1675       for all (potential) OSes under all search paths, where:
1676           - name is the (potential) OS name
1677           - path is the full path to the OS
1678           - status True/False is the validity of the OS
1679           - diagnose is the error message for an invalid OS, otherwise empty
1680           - variants is a list of supported OS variants, if any
1681
1682   """
1683   if top_dirs is None:
1684     top_dirs = constants.OS_SEARCH_PATH
1685
1686   result = []
1687   for dir_name in top_dirs:
1688     if os.path.isdir(dir_name):
1689       try:
1690         f_names = utils.ListVisibleFiles(dir_name)
1691       except EnvironmentError, err:
1692         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1693         break
1694       for name in f_names:
1695         os_path = os.path.sep.join([dir_name, name])
1696         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1697         if status:
1698           diagnose = ""
1699           variants = os_inst.supported_variants
1700         else:
1701           diagnose = os_inst
1702           variants = []
1703         result.append((name, os_path, status, diagnose, variants))
1704
1705   return result
1706
1707
1708 def _TryOSFromDisk(name, base_dir=None):
1709   """Create an OS instance from disk.
1710
1711   This function will return an OS instance if the given name is a
1712   valid OS name.
1713
1714   @type base_dir: string
1715   @keyword base_dir: Base directory containing OS installations.
1716                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1717   @rtype: tuple
1718   @return: success and either the OS instance if we find a valid one,
1719       or error message
1720
1721   """
1722   if base_dir is None:
1723     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1724     if os_dir is None:
1725       return False, "Directory for OS %s not found in search path" % name
1726   else:
1727     os_dir = os.path.sep.join([base_dir, name])
1728
1729   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1730   if not status:
1731     # push the error up
1732     return status, api_versions
1733
1734   if not constants.OS_API_VERSIONS.intersection(api_versions):
1735     return False, ("API version mismatch for path '%s': found %s, want %s." %
1736                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1737
1738   # OS Files dictionary, we will populate it with the absolute path names
1739   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1740
1741   if max(api_versions) >= constants.OS_API_V15:
1742     os_files[constants.OS_VARIANTS_FILE] = ''
1743
1744   for filename in os_files:
1745     os_files[filename] = os.path.sep.join([os_dir, filename])
1746
1747     try:
1748       st = os.stat(os_files[filename])
1749     except EnvironmentError, err:
1750       return False, ("File '%s' under path '%s' is missing (%s)" %
1751                      (filename, os_dir, _ErrnoOrStr(err)))
1752
1753     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1754       return False, ("File '%s' under path '%s' is not a regular file" %
1755                      (filename, os_dir))
1756
1757     if filename in constants.OS_SCRIPTS:
1758       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1759         return False, ("File '%s' under path '%s' is not executable" %
1760                        (filename, os_dir))
1761
1762   variants = None
1763   if constants.OS_VARIANTS_FILE in os_files:
1764     variants_file = os_files[constants.OS_VARIANTS_FILE]
1765     try:
1766       variants = utils.ReadFile(variants_file).splitlines()
1767     except EnvironmentError, err:
1768       return False, ("Error while reading the OS variants file at %s: %s" %
1769                      (variants_file, _ErrnoOrStr(err)))
1770     if not variants:
1771       return False, ("No supported os variant found")
1772
1773   os_obj = objects.OS(name=name, path=os_dir,
1774                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1775                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1776                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1777                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1778                       supported_variants=variants,
1779                       api_versions=api_versions)
1780   return True, os_obj
1781
1782
1783 def OSFromDisk(name, base_dir=None):
1784   """Create an OS instance from disk.
1785
1786   This function will return an OS instance if the given name is a
1787   valid OS name. Otherwise, it will raise an appropriate
1788   L{RPCFail} exception, detailing why this is not a valid OS.
1789
1790   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1791   an exception but returns true/false status data.
1792
1793   @type base_dir: string
1794   @keyword base_dir: Base directory containing OS installations.
1795                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1796   @rtype: L{objects.OS}
1797   @return: the OS instance if we find a valid one
1798   @raise RPCFail: if we don't find a valid OS
1799
1800   """
1801   name_only = name.split("+", 1)[0]
1802   status, payload = _TryOSFromDisk(name_only, base_dir)
1803
1804   if not status:
1805     _Fail(payload)
1806
1807   return payload
1808
1809
1810 def OSEnvironment(instance, inst_os, debug=0):
1811   """Calculate the environment for an os script.
1812
1813   @type instance: L{objects.Instance}
1814   @param instance: target instance for the os script run
1815   @type inst_os: L{objects.OS}
1816   @param inst_os: operating system for which the environment is being built
1817   @type debug: integer
1818   @param debug: debug level (0 or 1, for OS Api 10)
1819   @rtype: dict
1820   @return: dict of environment variables
1821   @raise errors.BlockDeviceError: if the block device
1822       cannot be found
1823
1824   """
1825   result = {}
1826   api_version = \
1827     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1828   result['OS_API_VERSION'] = '%d' % api_version
1829   result['INSTANCE_NAME'] = instance.name
1830   result['INSTANCE_OS'] = instance.os
1831   result['HYPERVISOR'] = instance.hypervisor
1832   result['DISK_COUNT'] = '%d' % len(instance.disks)
1833   result['NIC_COUNT'] = '%d' % len(instance.nics)
1834   result['DEBUG_LEVEL'] = '%d' % debug
1835   if api_version >= constants.OS_API_V15:
1836     try:
1837       variant = instance.os.split('+', 1)[1]
1838     except IndexError:
1839       variant = inst_os.supported_variants[0]
1840     result['OS_VARIANT'] = variant
1841   for idx, disk in enumerate(instance.disks):
1842     real_disk = _RecursiveFindBD(disk)
1843     if real_disk is None:
1844       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1845                                     str(disk))
1846     real_disk.Open()
1847     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1848     result['DISK_%d_ACCESS' % idx] = disk.mode
1849     if constants.HV_DISK_TYPE in instance.hvparams:
1850       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1851         instance.hvparams[constants.HV_DISK_TYPE]
1852     if disk.dev_type in constants.LDS_BLOCK:
1853       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1854     elif disk.dev_type == constants.LD_FILE:
1855       result['DISK_%d_BACKEND_TYPE' % idx] = \
1856         'file:%s' % disk.physical_id[0]
1857   for idx, nic in enumerate(instance.nics):
1858     result['NIC_%d_MAC' % idx] = nic.mac
1859     if nic.ip:
1860       result['NIC_%d_IP' % idx] = nic.ip
1861     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1862     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1863       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1864     if nic.nicparams[constants.NIC_LINK]:
1865       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1866     if constants.HV_NIC_TYPE in instance.hvparams:
1867       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1868         instance.hvparams[constants.HV_NIC_TYPE]
1869
1870   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1871     for key, value in source.items():
1872       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1873
1874   return result
1875
1876 def BlockdevGrow(disk, amount):
1877   """Grow a stack of block devices.
1878
1879   This function is called recursively, with the childrens being the
1880   first ones to resize.
1881
1882   @type disk: L{objects.Disk}
1883   @param disk: the disk to be grown
1884   @rtype: (status, result)
1885   @return: a tuple with the status of the operation
1886       (True/False), and the errors message if status
1887       is False
1888
1889   """
1890   r_dev = _RecursiveFindBD(disk)
1891   if r_dev is None:
1892     _Fail("Cannot find block device %s", disk)
1893
1894   try:
1895     r_dev.Grow(amount)
1896   except errors.BlockDeviceError, err:
1897     _Fail("Failed to grow block device: %s", err, exc=True)
1898
1899
1900 def BlockdevSnapshot(disk):
1901   """Create a snapshot copy of a block device.
1902
1903   This function is called recursively, and the snapshot is actually created
1904   just for the leaf lvm backend device.
1905
1906   @type disk: L{objects.Disk}
1907   @param disk: the disk to be snapshotted
1908   @rtype: string
1909   @return: snapshot disk path
1910
1911   """
1912   if disk.children:
1913     if len(disk.children) == 1:
1914       # only one child, let's recurse on it
1915       return BlockdevSnapshot(disk.children[0])
1916     else:
1917       # more than one child, choose one that matches
1918       for child in disk.children:
1919         if child.size == disk.size:
1920           # return implies breaking the loop
1921           return BlockdevSnapshot(child)
1922   elif disk.dev_type == constants.LD_LV:
1923     r_dev = _RecursiveFindBD(disk)
1924     if r_dev is not None:
1925       # let's stay on the safe side and ask for the full size, for now
1926       return r_dev.Snapshot(disk.size)
1927     else:
1928       _Fail("Cannot find block device %s", disk)
1929   else:
1930     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1931           disk.unique_id, disk.dev_type)
1932
1933
1934 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1935   """Export a block device snapshot to a remote node.
1936
1937   @type disk: L{objects.Disk}
1938   @param disk: the description of the disk to export
1939   @type dest_node: str
1940   @param dest_node: the destination node to export to
1941   @type instance: L{objects.Instance}
1942   @param instance: the instance object to whom the disk belongs
1943   @type cluster_name: str
1944   @param cluster_name: the cluster name, needed for SSH hostalias
1945   @type idx: int
1946   @param idx: the index of the disk in the instance's disk list,
1947       used to export to the OS scripts environment
1948   @rtype: None
1949
1950   """
1951   inst_os = OSFromDisk(instance.os)
1952   export_env = OSEnvironment(instance, inst_os)
1953
1954   export_script = inst_os.export_script
1955
1956   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1957                                      instance.name, int(time.time()))
1958   if not os.path.exists(constants.LOG_OS_DIR):
1959     os.mkdir(constants.LOG_OS_DIR, 0750)
1960   real_disk = _RecursiveFindBD(disk)
1961   if real_disk is None:
1962     _Fail("Block device '%s' is not set up", disk)
1963
1964   real_disk.Open()
1965
1966   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1967   export_env['EXPORT_INDEX'] = str(idx)
1968
1969   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1970   destfile = disk.physical_id[1]
1971
1972   # the target command is built out of three individual commands,
1973   # which are joined by pipes; we check each individual command for
1974   # valid parameters
1975   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1976                                inst_os.path, export_script, logfile)
1977
1978   comprcmd = "gzip"
1979
1980   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1981                                 destdir, destdir, destfile)
1982   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1983                                                    constants.GANETI_RUNAS,
1984                                                    destcmd)
1985
1986   # all commands have been checked, so we're safe to combine them
1987   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1988
1989   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1990
1991   if result.failed:
1992     _Fail("OS snapshot export command '%s' returned error: %s"
1993           " output: %s", command, result.fail_reason, result.output)
1994
1995
1996 def FinalizeExport(instance, snap_disks):
1997   """Write out the export configuration information.
1998
1999   @type instance: L{objects.Instance}
2000   @param instance: the instance which we export, used for
2001       saving configuration
2002   @type snap_disks: list of L{objects.Disk}
2003   @param snap_disks: list of snapshot block devices, which
2004       will be used to get the actual name of the dump file
2005
2006   @rtype: None
2007
2008   """
2009   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
2010   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
2011
2012   config = objects.SerializableConfigParser()
2013
2014   config.add_section(constants.INISECT_EXP)
2015   config.set(constants.INISECT_EXP, 'version', '0')
2016   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2017   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2018   config.set(constants.INISECT_EXP, 'os', instance.os)
2019   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2020
2021   config.add_section(constants.INISECT_INS)
2022   config.set(constants.INISECT_INS, 'name', instance.name)
2023   config.set(constants.INISECT_INS, 'memory', '%d' %
2024              instance.beparams[constants.BE_MEMORY])
2025   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2026              instance.beparams[constants.BE_VCPUS])
2027   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2028
2029   nic_total = 0
2030   for nic_count, nic in enumerate(instance.nics):
2031     nic_total += 1
2032     config.set(constants.INISECT_INS, 'nic%d_mac' %
2033                nic_count, '%s' % nic.mac)
2034     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2035     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2036                '%s' % nic.bridge)
2037   # TODO: redundant: on load can read nics until it doesn't exist
2038   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2039
2040   disk_total = 0
2041   for disk_count, disk in enumerate(snap_disks):
2042     if disk:
2043       disk_total += 1
2044       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2045                  ('%s' % disk.iv_name))
2046       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2047                  ('%s' % disk.physical_id[1]))
2048       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2049                  ('%d' % disk.size))
2050
2051   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2052
2053   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2054                   data=config.Dumps())
2055   shutil.rmtree(finaldestdir, True)
2056   shutil.move(destdir, finaldestdir)
2057
2058
2059 def ExportInfo(dest):
2060   """Get export configuration information.
2061
2062   @type dest: str
2063   @param dest: directory containing the export
2064
2065   @rtype: L{objects.SerializableConfigParser}
2066   @return: a serializable config file containing the
2067       export info
2068
2069   """
2070   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2071
2072   config = objects.SerializableConfigParser()
2073   config.read(cff)
2074
2075   if (not config.has_section(constants.INISECT_EXP) or
2076       not config.has_section(constants.INISECT_INS)):
2077     _Fail("Export info file doesn't have the required fields")
2078
2079   return config.Dumps()
2080
2081
2082 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2083   """Import an os image into an instance.
2084
2085   @type instance: L{objects.Instance}
2086   @param instance: instance to import the disks into
2087   @type src_node: string
2088   @param src_node: source node for the disk images
2089   @type src_images: list of string
2090   @param src_images: absolute paths of the disk images
2091   @rtype: list of boolean
2092   @return: each boolean represent the success of importing the n-th disk
2093
2094   """
2095   inst_os = OSFromDisk(instance.os)
2096   import_env = OSEnvironment(instance, inst_os)
2097   import_script = inst_os.import_script
2098
2099   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2100                                         instance.name, int(time.time()))
2101   if not os.path.exists(constants.LOG_OS_DIR):
2102     os.mkdir(constants.LOG_OS_DIR, 0750)
2103
2104   comprcmd = "gunzip"
2105   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2106                                import_script, logfile)
2107
2108   final_result = []
2109   for idx, image in enumerate(src_images):
2110     if image:
2111       destcmd = utils.BuildShellCmd('cat %s', image)
2112       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2113                                                        constants.GANETI_RUNAS,
2114                                                        destcmd)
2115       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2116       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2117       import_env['IMPORT_INDEX'] = str(idx)
2118       result = utils.RunCmd(command, env=import_env)
2119       if result.failed:
2120         logging.error("Disk import command '%s' returned error: %s"
2121                       " output: %s", command, result.fail_reason,
2122                       result.output)
2123         final_result.append("error importing disk %d: %s, %s" %
2124                             (idx, result.fail_reason, result.output[-100]))
2125
2126   if final_result:
2127     _Fail("; ".join(final_result), log=False)
2128
2129
2130 def ListExports():
2131   """Return a list of exports currently available on this machine.
2132
2133   @rtype: list
2134   @return: list of the exports
2135
2136   """
2137   if os.path.isdir(constants.EXPORT_DIR):
2138     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2139   else:
2140     _Fail("No exports directory")
2141
2142
2143 def RemoveExport(export):
2144   """Remove an existing export from the node.
2145
2146   @type export: str
2147   @param export: the name of the export to remove
2148   @rtype: None
2149
2150   """
2151   target = os.path.join(constants.EXPORT_DIR, export)
2152
2153   try:
2154     shutil.rmtree(target)
2155   except EnvironmentError, err:
2156     _Fail("Error while removing the export: %s", err, exc=True)
2157
2158
2159 def BlockdevRename(devlist):
2160   """Rename a list of block devices.
2161
2162   @type devlist: list of tuples
2163   @param devlist: list of tuples of the form  (disk,
2164       new_logical_id, new_physical_id); disk is an
2165       L{objects.Disk} object describing the current disk,
2166       and new logical_id/physical_id is the name we
2167       rename it to
2168   @rtype: boolean
2169   @return: True if all renames succeeded, False otherwise
2170
2171   """
2172   msgs = []
2173   result = True
2174   for disk, unique_id in devlist:
2175     dev = _RecursiveFindBD(disk)
2176     if dev is None:
2177       msgs.append("Can't find device %s in rename" % str(disk))
2178       result = False
2179       continue
2180     try:
2181       old_rpath = dev.dev_path
2182       dev.Rename(unique_id)
2183       new_rpath = dev.dev_path
2184       if old_rpath != new_rpath:
2185         DevCacheManager.RemoveCache(old_rpath)
2186         # FIXME: we should add the new cache information here, like:
2187         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2188         # but we don't have the owner here - maybe parse from existing
2189         # cache? for now, we only lose lvm data when we rename, which
2190         # is less critical than DRBD or MD
2191     except errors.BlockDeviceError, err:
2192       msgs.append("Can't rename device '%s' to '%s': %s" %
2193                   (dev, unique_id, err))
2194       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2195       result = False
2196   if not result:
2197     _Fail("; ".join(msgs))
2198
2199
2200 def _TransformFileStorageDir(file_storage_dir):
2201   """Checks whether given file_storage_dir is valid.
2202
2203   Checks wheter the given file_storage_dir is within the cluster-wide
2204   default file_storage_dir stored in SimpleStore. Only paths under that
2205   directory are allowed.
2206
2207   @type file_storage_dir: str
2208   @param file_storage_dir: the path to check
2209
2210   @return: the normalized path if valid, None otherwise
2211
2212   """
2213   cfg = _GetConfig()
2214   file_storage_dir = os.path.normpath(file_storage_dir)
2215   base_file_storage_dir = cfg.GetFileStorageDir()
2216   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2217       base_file_storage_dir):
2218     _Fail("File storage directory '%s' is not under base file"
2219           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2220   return file_storage_dir
2221
2222
2223 def CreateFileStorageDir(file_storage_dir):
2224   """Create file storage directory.
2225
2226   @type file_storage_dir: str
2227   @param file_storage_dir: directory to create
2228
2229   @rtype: tuple
2230   @return: tuple with first element a boolean indicating wheter dir
2231       creation was successful or not
2232
2233   """
2234   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2235   if os.path.exists(file_storage_dir):
2236     if not os.path.isdir(file_storage_dir):
2237       _Fail("Specified storage dir '%s' is not a directory",
2238             file_storage_dir)
2239   else:
2240     try:
2241       os.makedirs(file_storage_dir, 0750)
2242     except OSError, err:
2243       _Fail("Cannot create file storage directory '%s': %s",
2244             file_storage_dir, err, exc=True)
2245
2246
2247 def RemoveFileStorageDir(file_storage_dir):
2248   """Remove file storage directory.
2249
2250   Remove it only if it's empty. If not log an error and return.
2251
2252   @type file_storage_dir: str
2253   @param file_storage_dir: the directory we should cleanup
2254   @rtype: tuple (success,)
2255   @return: tuple of one element, C{success}, denoting
2256       whether the operation was successful
2257
2258   """
2259   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2260   if os.path.exists(file_storage_dir):
2261     if not os.path.isdir(file_storage_dir):
2262       _Fail("Specified Storage directory '%s' is not a directory",
2263             file_storage_dir)
2264     # deletes dir only if empty, otherwise we want to fail the rpc call
2265     try:
2266       os.rmdir(file_storage_dir)
2267     except OSError, err:
2268       _Fail("Cannot remove file storage directory '%s': %s",
2269             file_storage_dir, err)
2270
2271
2272 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2273   """Rename the file storage directory.
2274
2275   @type old_file_storage_dir: str
2276   @param old_file_storage_dir: the current path
2277   @type new_file_storage_dir: str
2278   @param new_file_storage_dir: the name we should rename to
2279   @rtype: tuple (success,)
2280   @return: tuple of one element, C{success}, denoting
2281       whether the operation was successful
2282
2283   """
2284   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2285   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2286   if not os.path.exists(new_file_storage_dir):
2287     if os.path.isdir(old_file_storage_dir):
2288       try:
2289         os.rename(old_file_storage_dir, new_file_storage_dir)
2290       except OSError, err:
2291         _Fail("Cannot rename '%s' to '%s': %s",
2292               old_file_storage_dir, new_file_storage_dir, err)
2293     else:
2294       _Fail("Specified storage dir '%s' is not a directory",
2295             old_file_storage_dir)
2296   else:
2297     if os.path.exists(old_file_storage_dir):
2298       _Fail("Cannot rename '%s' to '%s': both locations exist",
2299             old_file_storage_dir, new_file_storage_dir)
2300
2301
2302 def _EnsureJobQueueFile(file_name):
2303   """Checks whether the given filename is in the queue directory.
2304
2305   @type file_name: str
2306   @param file_name: the file name we should check
2307   @rtype: None
2308   @raises RPCFail: if the file is not valid
2309
2310   """
2311   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2312   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2313
2314   if not result:
2315     _Fail("Passed job queue file '%s' does not belong to"
2316           " the queue directory '%s'", file_name, queue_dir)
2317
2318
2319 def JobQueueUpdate(file_name, content):
2320   """Updates a file in the queue directory.
2321
2322   This is just a wrapper over L{utils.WriteFile}, with proper
2323   checking.
2324
2325   @type file_name: str
2326   @param file_name: the job file name
2327   @type content: str
2328   @param content: the new job contents
2329   @rtype: boolean
2330   @return: the success of the operation
2331
2332   """
2333   _EnsureJobQueueFile(file_name)
2334
2335   # Write and replace the file atomically
2336   utils.WriteFile(file_name, data=_Decompress(content))
2337
2338
2339 def JobQueueRename(old, new):
2340   """Renames a job queue file.
2341
2342   This is just a wrapper over os.rename with proper checking.
2343
2344   @type old: str
2345   @param old: the old (actual) file name
2346   @type new: str
2347   @param new: the desired file name
2348   @rtype: tuple
2349   @return: the success of the operation and payload
2350
2351   """
2352   _EnsureJobQueueFile(old)
2353   _EnsureJobQueueFile(new)
2354
2355   utils.RenameFile(old, new, mkdir=True)
2356
2357
2358 def JobQueueSetDrainFlag(drain_flag):
2359   """Set the drain flag for the queue.
2360
2361   This will set or unset the queue drain flag.
2362
2363   @type drain_flag: boolean
2364   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2365   @rtype: truple
2366   @return: always True, None
2367   @warning: the function always returns True
2368
2369   """
2370   if drain_flag:
2371     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2372   else:
2373     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2374
2375
2376 def BlockdevClose(instance_name, disks):
2377   """Closes the given block devices.
2378
2379   This means they will be switched to secondary mode (in case of
2380   DRBD).
2381
2382   @param instance_name: if the argument is not empty, the symlinks
2383       of this instance will be removed
2384   @type disks: list of L{objects.Disk}
2385   @param disks: the list of disks to be closed
2386   @rtype: tuple (success, message)
2387   @return: a tuple of success and message, where success
2388       indicates the succes of the operation, and message
2389       which will contain the error details in case we
2390       failed
2391
2392   """
2393   bdevs = []
2394   for cf in disks:
2395     rd = _RecursiveFindBD(cf)
2396     if rd is None:
2397       _Fail("Can't find device %s", cf)
2398     bdevs.append(rd)
2399
2400   msg = []
2401   for rd in bdevs:
2402     try:
2403       rd.Close()
2404     except errors.BlockDeviceError, err:
2405       msg.append(str(err))
2406   if msg:
2407     _Fail("Can't make devices secondary: %s", ",".join(msg))
2408   else:
2409     if instance_name:
2410       _RemoveBlockDevLinks(instance_name, disks)
2411
2412
2413 def ValidateHVParams(hvname, hvparams):
2414   """Validates the given hypervisor parameters.
2415
2416   @type hvname: string
2417   @param hvname: the hypervisor name
2418   @type hvparams: dict
2419   @param hvparams: the hypervisor parameters to be validated
2420   @rtype: None
2421
2422   """
2423   try:
2424     hv_type = hypervisor.GetHypervisor(hvname)
2425     hv_type.ValidateParameters(hvparams)
2426   except errors.HypervisorError, err:
2427     _Fail(str(err), log=False)
2428
2429
2430 def DemoteFromMC():
2431   """Demotes the current node from master candidate role.
2432
2433   """
2434   # try to ensure we're not the master by mistake
2435   master, myself = ssconf.GetMasterAndMyself()
2436   if master == myself:
2437     _Fail("ssconf status shows I'm the master node, will not demote")
2438   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2439   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2440     _Fail("The master daemon is running, will not demote")
2441   try:
2442     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2443       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2444   except EnvironmentError, err:
2445     if err.errno != errno.ENOENT:
2446       _Fail("Error while backing up cluster file: %s", err, exc=True)
2447   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2448
2449
2450 def _FindDisks(nodes_ip, disks):
2451   """Sets the physical ID on disks and returns the block devices.
2452
2453   """
2454   # set the correct physical ID
2455   my_name = utils.HostInfo().name
2456   for cf in disks:
2457     cf.SetPhysicalID(my_name, nodes_ip)
2458
2459   bdevs = []
2460
2461   for cf in disks:
2462     rd = _RecursiveFindBD(cf)
2463     if rd is None:
2464       _Fail("Can't find device %s", cf)
2465     bdevs.append(rd)
2466   return bdevs
2467
2468
2469 def DrbdDisconnectNet(nodes_ip, disks):
2470   """Disconnects the network on a list of drbd devices.
2471
2472   """
2473   bdevs = _FindDisks(nodes_ip, disks)
2474
2475   # disconnect disks
2476   for rd in bdevs:
2477     try:
2478       rd.DisconnectNet()
2479     except errors.BlockDeviceError, err:
2480       _Fail("Can't change network configuration to standalone mode: %s",
2481             err, exc=True)
2482
2483
2484 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2485   """Attaches the network on a list of drbd devices.
2486
2487   """
2488   bdevs = _FindDisks(nodes_ip, disks)
2489
2490   if multimaster:
2491     for idx, rd in enumerate(bdevs):
2492       try:
2493         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2494       except EnvironmentError, err:
2495         _Fail("Can't create symlink: %s", err)
2496   # reconnect disks, switch to new master configuration and if
2497   # needed primary mode
2498   for rd in bdevs:
2499     try:
2500       rd.AttachNet(multimaster)
2501     except errors.BlockDeviceError, err:
2502       _Fail("Can't change network configuration: %s", err)
2503
2504   # wait until the disks are connected; we need to retry the re-attach
2505   # if the device becomes standalone, as this might happen if the one
2506   # node disconnects and reconnects in a different mode before the
2507   # other node reconnects; in this case, one or both of the nodes will
2508   # decide it has wrong configuration and switch to standalone
2509
2510   def _Attach():
2511     all_connected = True
2512
2513     for rd in bdevs:
2514       stats = rd.GetProcStatus()
2515
2516       all_connected = (all_connected and
2517                        (stats.is_connected or stats.is_in_resync))
2518
2519       if stats.is_standalone:
2520         # peer had different config info and this node became
2521         # standalone, even though this should not happen with the
2522         # new staged way of changing disk configs
2523         try:
2524           rd.AttachNet(multimaster)
2525         except errors.BlockDeviceError, err:
2526           _Fail("Can't change network configuration: %s", err)
2527
2528     if not all_connected:
2529       raise utils.RetryAgain()
2530
2531   try:
2532     # Start with a delay of 100 miliseconds and go up to 5 seconds
2533     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2534   except utils.RetryTimeout:
2535     _Fail("Timeout in disk reconnecting")
2536
2537   if multimaster:
2538     # change to primary mode
2539     for rd in bdevs:
2540       try:
2541         rd.Open()
2542       except errors.BlockDeviceError, err:
2543         _Fail("Can't change to primary mode: %s", err)
2544
2545
2546 def DrbdWaitSync(nodes_ip, disks):
2547   """Wait until DRBDs have synchronized.
2548
2549   """
2550   def _helper(rd):
2551     stats = rd.GetProcStatus()
2552     if not (stats.is_connected or stats.is_in_resync):
2553       raise utils.RetryAgain()
2554     return stats
2555
2556   bdevs = _FindDisks(nodes_ip, disks)
2557
2558   min_resync = 100
2559   alldone = True
2560   for rd in bdevs:
2561     try:
2562       # poll each second for 15 seconds
2563       stats = utils.Retry(_helper, 1, 15, args=[rd])
2564     except utils.RetryTimeout:
2565       stats = rd.GetProcStatus()
2566       # last check
2567       if not (stats.is_connected or stats.is_in_resync):
2568         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2569     alldone = alldone and (not stats.is_in_resync)
2570     if stats.sync_percent is not None:
2571       min_resync = min(min_resync, stats.sync_percent)
2572
2573   return (alldone, min_resync)
2574
2575
2576 def PowercycleNode(hypervisor_type):
2577   """Hard-powercycle the node.
2578
2579   Because we need to return first, and schedule the powercycle in the
2580   background, we won't be able to report failures nicely.
2581
2582   """
2583   hyper = hypervisor.GetHypervisor(hypervisor_type)
2584   try:
2585     pid = os.fork()
2586   except OSError:
2587     # if we can't fork, we'll pretend that we're in the child process
2588     pid = 0
2589   if pid > 0:
2590     return "Reboot scheduled in 5 seconds"
2591   time.sleep(5)
2592   hyper.PowercycleNode()
2593
2594
2595 class HooksRunner(object):
2596   """Hook runner.
2597
2598   This class is instantiated on the node side (ganeti-noded) and not
2599   on the master side.
2600
2601   """
2602   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2603
2604   def __init__(self, hooks_base_dir=None):
2605     """Constructor for hooks runner.
2606
2607     @type hooks_base_dir: str or None
2608     @param hooks_base_dir: if not None, this overrides the
2609         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2610
2611     """
2612     if hooks_base_dir is None:
2613       hooks_base_dir = constants.HOOKS_BASE_DIR
2614     self._BASE_DIR = hooks_base_dir
2615
2616   @staticmethod
2617   def ExecHook(script, env):
2618     """Exec one hook script.
2619
2620     @type script: str
2621     @param script: the full path to the script
2622     @type env: dict
2623     @param env: the environment with which to exec the script
2624     @rtype: tuple (success, message)
2625     @return: a tuple of success and message, where success
2626         indicates the succes of the operation, and message
2627         which will contain the error details in case we
2628         failed
2629
2630     """
2631     # exec the process using subprocess and log the output
2632     fdstdin = None
2633     try:
2634       fdstdin = open("/dev/null", "r")
2635       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2636                                stderr=subprocess.STDOUT, close_fds=True,
2637                                shell=False, cwd="/", env=env)
2638       output = ""
2639       try:
2640         output = child.stdout.read(4096)
2641         child.stdout.close()
2642       except EnvironmentError, err:
2643         output += "Hook script error: %s" % str(err)
2644
2645       while True:
2646         try:
2647           result = child.wait()
2648           break
2649         except EnvironmentError, err:
2650           if err.errno == errno.EINTR:
2651             continue
2652           raise
2653     finally:
2654       # try not to leak fds
2655       for fd in (fdstdin, ):
2656         if fd is not None:
2657           try:
2658             fd.close()
2659           except EnvironmentError, err:
2660             # just log the error
2661             #logging.exception("Error while closing fd %s", fd)
2662             pass
2663
2664     return result == 0, utils.SafeEncode(output.strip())
2665
2666   def RunHooks(self, hpath, phase, env):
2667     """Run the scripts in the hooks directory.
2668
2669     @type hpath: str
2670     @param hpath: the path to the hooks directory which
2671         holds the scripts
2672     @type phase: str
2673     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2674         L{constants.HOOKS_PHASE_POST}
2675     @type env: dict
2676     @param env: dictionary with the environment for the hook
2677     @rtype: list
2678     @return: list of 3-element tuples:
2679       - script path
2680       - script result, either L{constants.HKR_SUCCESS} or
2681         L{constants.HKR_FAIL}
2682       - output of the script
2683
2684     @raise errors.ProgrammerError: for invalid input
2685         parameters
2686
2687     """
2688     if phase == constants.HOOKS_PHASE_PRE:
2689       suffix = "pre"
2690     elif phase == constants.HOOKS_PHASE_POST:
2691       suffix = "post"
2692     else:
2693       _Fail("Unknown hooks phase '%s'", phase)
2694
2695     rr = []
2696
2697     subdir = "%s-%s.d" % (hpath, suffix)
2698     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2699     try:
2700       dir_contents = utils.ListVisibleFiles(dir_name)
2701     except OSError:
2702       # FIXME: must log output in case of failures
2703       return rr
2704
2705     # we use the standard python sort order,
2706     # so 00name is the recommended naming scheme
2707     dir_contents.sort()
2708     for relname in dir_contents:
2709       fname = os.path.join(dir_name, relname)
2710       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2711           self.RE_MASK.match(relname) is not None):
2712         rrval = constants.HKR_SKIP
2713         output = ""
2714       else:
2715         result, output = self.ExecHook(fname, env)
2716         if not result:
2717           rrval = constants.HKR_FAIL
2718         else:
2719           rrval = constants.HKR_SUCCESS
2720       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2721
2722     return rr
2723
2724
2725 class IAllocatorRunner(object):
2726   """IAllocator runner.
2727
2728   This class is instantiated on the node side (ganeti-noded) and not on
2729   the master side.
2730
2731   """
2732   def Run(self, name, idata):
2733     """Run an iallocator script.
2734
2735     @type name: str
2736     @param name: the iallocator script name
2737     @type idata: str
2738     @param idata: the allocator input data
2739
2740     @rtype: tuple
2741     @return: two element tuple of:
2742        - status
2743        - either error message or stdout of allocator (for success)
2744
2745     """
2746     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2747                                   os.path.isfile)
2748     if alloc_script is None:
2749       _Fail("iallocator module '%s' not found in the search path", name)
2750
2751     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2752     try:
2753       os.write(fd, idata)
2754       os.close(fd)
2755       result = utils.RunCmd([alloc_script, fin_name])
2756       if result.failed:
2757         _Fail("iallocator module '%s' failed: %s, output '%s'",
2758               name, result.fail_reason, result.output)
2759     finally:
2760       os.unlink(fin_name)
2761
2762     return result.stdout
2763
2764
2765 class DevCacheManager(object):
2766   """Simple class for managing a cache of block device information.
2767
2768   """
2769   _DEV_PREFIX = "/dev/"
2770   _ROOT_DIR = constants.BDEV_CACHE_DIR
2771
2772   @classmethod
2773   def _ConvertPath(cls, dev_path):
2774     """Converts a /dev/name path to the cache file name.
2775
2776     This replaces slashes with underscores and strips the /dev
2777     prefix. It then returns the full path to the cache file.
2778
2779     @type dev_path: str
2780     @param dev_path: the C{/dev/} path name
2781     @rtype: str
2782     @return: the converted path name
2783
2784     """
2785     if dev_path.startswith(cls._DEV_PREFIX):
2786       dev_path = dev_path[len(cls._DEV_PREFIX):]
2787     dev_path = dev_path.replace("/", "_")
2788     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2789     return fpath
2790
2791   @classmethod
2792   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2793     """Updates the cache information for a given device.
2794
2795     @type dev_path: str
2796     @param dev_path: the pathname of the device
2797     @type owner: str
2798     @param owner: the owner (instance name) of the device
2799     @type on_primary: bool
2800     @param on_primary: whether this is the primary
2801         node nor not
2802     @type iv_name: str
2803     @param iv_name: the instance-visible name of the
2804         device, as in objects.Disk.iv_name
2805
2806     @rtype: None
2807
2808     """
2809     if dev_path is None:
2810       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2811       return
2812     fpath = cls._ConvertPath(dev_path)
2813     if on_primary:
2814       state = "primary"
2815     else:
2816       state = "secondary"
2817     if iv_name is None:
2818       iv_name = "not_visible"
2819     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2820     try:
2821       utils.WriteFile(fpath, data=fdata)
2822     except EnvironmentError, err:
2823       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2824
2825   @classmethod
2826   def RemoveCache(cls, dev_path):
2827     """Remove data for a dev_path.
2828
2829     This is just a wrapper over L{utils.RemoveFile} with a converted
2830     path name and logging.
2831
2832     @type dev_path: str
2833     @param dev_path: the pathname of the device
2834
2835     @rtype: None
2836
2837     """
2838     if dev_path is None:
2839       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2840       return
2841     fpath = cls._ConvertPath(dev_path)
2842     try:
2843       utils.RemoveFile(fpath)
2844     except EnvironmentError, err:
2845       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)