4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssh.SshRunner()
51 def _CleanDirectory(path):
52 if not os.path.isdir(path):
54 for rel_name in utils.ListVisibleFiles(path):
55 full_name = os.path.join(path, rel_name)
56 if os.path.isfile(full_name) and not os.path.islink(full_name):
57 utils.RemoveFile(full_name)
61 """Return the master ip and netdev.
65 ss = ssconf.SimpleStore()
66 master_netdev = ss.GetMasterNetdev()
67 master_ip = ss.GetMasterIP()
68 except errors.ConfigurationError, err:
69 logging.exception("Cluster configuration incomplete")
71 return (master_netdev, master_ip)
74 def StartMaster(start_daemons):
75 """Activate local node as master node.
77 The function will always try activate the IP address of the master
78 (if someone else has it, then it won't). Then, if the start_daemons
79 parameter is True, it will also start the master daemons
80 (ganet-masterd and ganeti-rapi).
84 master_netdev, master_ip = _GetMasterInfo()
88 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
89 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
90 source=constants.LOCALHOST_IP_ADDRESS):
91 # we already have the ip:
92 logging.debug("Already started")
94 logging.error("Someone else has the master ip, not activating")
97 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
98 "dev", master_netdev, "label",
99 "%s:0" % master_netdev])
101 logging.error("Can't activate master IP: %s", result.output)
104 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
105 "-s", master_ip, master_ip])
106 # we'll ignore the exit code of arping
108 # and now start the master and rapi daemons
110 for daemon in 'ganeti-masterd', 'ganeti-rapi':
111 result = utils.RunCmd([daemon])
113 logging.error("Can't start daemon %s: %s", daemon, result.output)
118 def StopMaster(stop_daemons):
119 """Deactivate this node as master.
121 The function will always try to deactivate the IP address of the
122 master. Then, if the stop_daemons parameter is True, it will also
123 stop the master daemons (ganet-masterd and ganeti-rapi).
126 master_netdev, master_ip = _GetMasterInfo()
127 if not master_netdev:
130 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
131 "dev", master_netdev])
133 logging.error("Can't remove the master IP, error: %s", result.output)
134 # but otherwise ignore the failure
137 # stop/kill the rapi and the master daemon
138 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
139 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
144 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
145 """Joins this node to the cluster.
147 This does the following:
148 - updates the hostkeys of the machine (rsa and dsa)
149 - adds the ssh private key to the user
150 - adds the ssh public key to the users' authorized_keys file
153 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
154 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
155 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
156 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
157 for name, content, mode in sshd_keys:
158 utils.WriteFile(name, data=content, mode=mode)
161 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
163 except errors.OpExecError, err:
164 logging.exception("Error while processing user ssh files")
167 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
168 utils.WriteFile(name, data=content, mode=0600)
170 utils.AddAuthorizedKey(auth_keys, sshpub)
172 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
178 """Cleans up the current node and prepares it to be removed from the cluster.
181 _CleanDirectory(constants.DATA_DIR)
186 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
187 except errors.OpExecError:
188 logging.exception("Error while processing ssh files")
191 f = open(pub_key, 'r')
193 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
197 utils.RemoveFile(priv_key)
198 utils.RemoveFile(pub_key)
200 # Return a reassuring string to the caller, and quit
201 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
204 def GetNodeInfo(vgname):
205 """Gives back a hash with different informations about the node.
208 { 'vg_size' : xxx, 'vg_free' : xxx, 'memory_domain0': xxx,
209 'memory_free' : xxx, 'memory_total' : xxx }
211 vg_size is the size of the configured volume group in MiB
212 vg_free is the free size of the volume group in MiB
213 memory_dom0 is the memory allocated for domain0 in MiB
214 memory_free is the currently available (free) ram in MiB
215 memory_total is the total number of ram in MiB
219 vginfo = _GetVGInfo(vgname)
220 outputarray['vg_size'] = vginfo['vg_size']
221 outputarray['vg_free'] = vginfo['vg_free']
223 hyper = hypervisor.GetHypervisor()
224 hyp_info = hyper.GetNodeInfo()
225 if hyp_info is not None:
226 outputarray.update(hyp_info)
228 f = open("/proc/sys/kernel/random/boot_id", 'r')
230 outputarray["bootid"] = f.read(128).rstrip("\n")
237 def VerifyNode(what):
238 """Verify the status of the local node.
241 what - a dictionary of things to check:
242 'filelist' : list of files for which to compute checksums
243 'nodelist' : list of nodes we should check communication with
244 'hypervisor': run the hypervisor-specific verify
246 Requested files on local node are checksummed and the result returned.
248 The nodelist is traversed, with the following checks being made
250 - known_hosts key correct
251 - correct resolving of node name (target node returns its own hostname
252 by ssh-execution of 'hostname', result compared against name in list.
257 if 'hypervisor' in what:
258 result['hypervisor'] = hypervisor.GetHypervisor().Verify()
260 if 'filelist' in what:
261 result['filelist'] = utils.FingerprintFiles(what['filelist'])
263 if 'nodelist' in what:
264 result['nodelist'] = {}
265 random.shuffle(what['nodelist'])
266 for node in what['nodelist']:
267 success, message = _GetSshRunner().VerifyNodeHostname(node)
269 result['nodelist'][node] = message
270 if 'node-net-test' in what:
271 result['node-net-test'] = {}
272 my_name = utils.HostInfo().name
273 my_pip = my_sip = None
274 for name, pip, sip in what['node-net-test']:
280 result['node-net-test'][my_name] = ("Can't find my own"
281 " primary/secondary IP"
284 port = ssconf.SimpleStore().GetNodeDaemonPort()
285 for name, pip, sip in what['node-net-test']:
287 if not utils.TcpPing(pip, port, source=my_pip):
288 fail.append("primary")
290 if not utils.TcpPing(sip, port, source=my_sip):
291 fail.append("secondary")
293 result['node-net-test'][name] = ("failure using the %s"
300 def GetVolumeList(vg_name):
301 """Compute list of logical volumes and their size.
304 dictionary of all partions (key) with their size (in MiB), inactive
306 {'test1': ('20.06', True, True)}
311 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
312 "--separator=%s" % sep,
313 "-olv_name,lv_size,lv_attr", vg_name])
315 logging.error("Failed to list logical volumes, lvs output: %s",
319 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
320 for line in result.stdout.splitlines():
322 match = valid_line_re.match(line)
324 logging.error("Invalid line returned from lvs output: '%s'", line)
326 name, size, attr = match.groups()
327 inactive = attr[4] == '-'
328 online = attr[5] == 'o'
329 lvs[name] = (size, inactive, online)
334 def ListVolumeGroups():
335 """List the volume groups and their size.
338 Dictionary with keys volume name and values the size of the volume
341 return utils.ListVolumeGroups()
345 """List all volumes on this node.
348 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
350 "--options=lv_name,lv_size,devices,vg_name"])
352 logging.error("Failed to list logical volumes, lvs output: %s",
358 return dev.split('(')[0]
364 'name': line[0].strip(),
365 'size': line[1].strip(),
366 'dev': parse_dev(line[2].strip()),
367 'vg': line[3].strip(),
370 return [map_line(line.split('|')) for line in result.stdout.splitlines()
371 if line.count('|') >= 3]
374 def BridgesExist(bridges_list):
375 """Check if a list of bridges exist on the current node.
378 True if all of them exist, false otherwise
381 for bridge in bridges_list:
382 if not utils.BridgeExists(bridge):
388 def GetInstanceList():
389 """Provides a list of instances.
392 A list of all running instances on the current node
393 - instance1.example.com
394 - instance2.example.com
398 names = hypervisor.GetHypervisor().ListInstances()
399 except errors.HypervisorError, err:
400 logging.exception("Error enumerating instances")
406 def GetInstanceInfo(instance):
407 """Gives back the informations about an instance as a dictionary.
410 instance: name of the instance (ex. instance1.example.com)
413 { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
415 memory: memory size of instance (int)
416 state: xen state of instance (string)
417 time: cpu time of instance (float)
422 iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
423 if iinfo is not None:
424 output['memory'] = iinfo[2]
425 output['state'] = iinfo[4]
426 output['time'] = iinfo[5]
431 def GetAllInstancesInfo():
432 """Gather data about all instances.
434 This is the equivalent of `GetInstanceInfo()`, except that it
435 computes data for all instances at once, thus being faster if one
436 needs data about more than one instance.
438 Returns: a dictionary of dictionaries, keys being the instance name,
440 { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
442 memory: memory size of instance (int)
443 state: xen state of instance (string)
444 time: cpu time of instance (float)
445 vcpus: the number of cpus
450 iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
452 for name, inst_id, memory, vcpus, state, times in iinfo:
463 def AddOSToInstance(instance, os_disk, swap_disk):
464 """Add an OS to an instance.
467 instance: the instance object
468 os_disk: the instance-visible name of the os device
469 swap_disk: the instance-visible name of the swap device
472 inst_os = OSFromDisk(instance.os)
474 create_script = inst_os.create_script
476 os_device = instance.FindDisk(os_disk)
477 if os_device is None:
478 logging.error("Can't find this device-visible name '%s'", os_disk)
481 swap_device = instance.FindDisk(swap_disk)
482 if swap_device is None:
483 logging.error("Can't find this device-visible name '%s'", swap_disk)
486 real_os_dev = _RecursiveFindBD(os_device)
487 if real_os_dev is None:
488 raise errors.BlockDeviceError("Block device '%s' is not set up" %
492 real_swap_dev = _RecursiveFindBD(swap_device)
493 if real_swap_dev is None:
494 raise errors.BlockDeviceError("Block device '%s' is not set up" %
498 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
499 instance.name, int(time.time()))
500 if not os.path.exists(constants.LOG_OS_DIR):
501 os.mkdir(constants.LOG_OS_DIR, 0750)
503 command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
504 inst_os.path, create_script, instance.name,
505 real_os_dev.dev_path, real_swap_dev.dev_path,
508 result = utils.RunCmd(command)
510 logging.error("os create command '%s' returned error: %s, logfile: %s,"
511 " output: %s", command, result.fail_reason, logfile,
518 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
519 """Run the OS rename script for an instance.
522 instance: the instance object
523 old_name: the old name of the instance
524 os_disk: the instance-visible name of the os device
525 swap_disk: the instance-visible name of the swap device
528 inst_os = OSFromDisk(instance.os)
530 script = inst_os.rename_script
532 os_device = instance.FindDisk(os_disk)
533 if os_device is None:
534 logging.error("Can't find this device-visible name '%s'", os_disk)
537 swap_device = instance.FindDisk(swap_disk)
538 if swap_device is None:
539 logging.error("Can't find this device-visible name '%s'", swap_disk)
542 real_os_dev = _RecursiveFindBD(os_device)
543 if real_os_dev is None:
544 raise errors.BlockDeviceError("Block device '%s' is not set up" %
548 real_swap_dev = _RecursiveFindBD(swap_device)
549 if real_swap_dev is None:
550 raise errors.BlockDeviceError("Block device '%s' is not set up" %
554 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
556 instance.name, int(time.time()))
557 if not os.path.exists(constants.LOG_OS_DIR):
558 os.mkdir(constants.LOG_OS_DIR, 0750)
560 command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
561 inst_os.path, script, old_name, instance.name,
562 real_os_dev.dev_path, real_swap_dev.dev_path,
565 result = utils.RunCmd(command)
568 logging.error("os create command '%s' returned error: %s output: %s",
569 command, result.fail_reason, result.output)
575 def _GetVGInfo(vg_name):
576 """Get informations about the volume group.
579 vg_name: the volume group
582 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
584 vg_size is the total size of the volume group in MiB
585 vg_free is the free size of the volume group in MiB
586 pv_count are the number of physical disks in that vg
588 If an error occurs during gathering of data, we return the same dict
589 with keys all set to None.
592 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
594 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
595 "--nosuffix", "--units=m", "--separator=:", vg_name])
598 logging.error("volume group %s not present", vg_name)
600 valarr = retval.stdout.strip().rstrip(':').split(':')
604 "vg_size": int(round(float(valarr[0]), 0)),
605 "vg_free": int(round(float(valarr[1]), 0)),
606 "pv_count": int(valarr[2]),
608 except ValueError, err:
609 logging.exception("Fail to parse vgs output")
611 logging.error("vgs output has the wrong number of fields (expected"
612 " three): %s", str(valarr))
616 def _GatherBlockDevs(instance):
617 """Set up an instance's block device(s).
619 This is run on the primary node at instance startup. The block
620 devices must be already assembled.
624 for disk in instance.disks:
625 device = _RecursiveFindBD(disk)
627 raise errors.BlockDeviceError("Block device '%s' is not set up." %
630 block_devices.append((disk, device))
634 def StartInstance(instance, extra_args):
635 """Start an instance.
638 instance - name of instance to start.
641 running_instances = GetInstanceList()
643 if instance.name in running_instances:
646 block_devices = _GatherBlockDevs(instance)
647 hyper = hypervisor.GetHypervisor()
650 hyper.StartInstance(instance, block_devices, extra_args)
651 except errors.HypervisorError, err:
652 logging.exception("Failed to start instance")
658 def ShutdownInstance(instance):
659 """Shut an instance down.
662 instance - name of instance to shutdown.
665 running_instances = GetInstanceList()
667 if instance.name not in running_instances:
670 hyper = hypervisor.GetHypervisor()
672 hyper.StopInstance(instance)
673 except errors.HypervisorError, err:
674 logging.error("Failed to stop instance")
677 # test every 10secs for 2min
681 for dummy in range(11):
682 if instance.name not in GetInstanceList():
686 # the shutdown did not succeed
687 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
690 hyper.StopInstance(instance, force=True)
691 except errors.HypervisorError, err:
692 logging.exception("Failed to stop instance")
696 if instance.name in GetInstanceList():
697 logging.error("could not shutdown instance '%s' even by destroy",
704 def RebootInstance(instance, reboot_type, extra_args):
705 """Reboot an instance.
708 instance - name of instance to reboot
709 reboot_type - how to reboot [soft,hard,full]
712 running_instances = GetInstanceList()
714 if instance.name not in running_instances:
715 logging.error("Cannot reboot instance that is not running")
718 hyper = hypervisor.GetHypervisor()
719 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
721 hyper.RebootInstance(instance)
722 except errors.HypervisorError, err:
723 logging.exception("Failed to soft reboot instance")
725 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
727 ShutdownInstance(instance)
728 StartInstance(instance, extra_args)
729 except errors.HypervisorError, err:
730 logging.exception("Failed to hard reboot instance")
733 raise errors.ParameterError("reboot_type invalid")
739 def MigrateInstance(instance, target, live):
740 """Migrates an instance to another node.
743 hyper = hypervisor.GetHypervisor()
746 hyper.MigrateInstance(instance, target, live)
747 except errors.HypervisorError, err:
748 msg = "Failed to migrate instance: %s" % str(err)
751 return (True, "Migration successfull")
754 def CreateBlockDevice(disk, size, owner, on_primary, info):
755 """Creates a block device for an instance.
758 disk: a ganeti.objects.Disk object
759 size: the size of the physical underlying device
760 owner: a string with the name of the instance
761 on_primary: a boolean indicating if it is the primary node or not
762 info: string that will be sent to the physical device creation
765 the new unique_id of the device (this can sometime be
766 computed only after creation), or None. On secondary nodes,
767 it's not required to return anything.
772 for child in disk.children:
773 crdev = _RecursiveAssembleBD(child, owner, on_primary)
774 if on_primary or disk.AssembleOnSecondary():
775 # we need the children open in case the device itself has to
780 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
781 if device is not None:
782 logging.info("removing existing device %s", disk)
784 except errors.BlockDeviceError, err:
787 device = bdev.Create(disk.dev_type, disk.physical_id,
790 raise ValueError("Can't create child device for %s, %s" %
792 if on_primary or disk.AssembleOnSecondary():
793 if not device.Assemble():
794 errorstring = "Can't assemble device after creation"
795 logging.error(errorstring)
796 raise errors.BlockDeviceError("%s, very unusual event - check the node"
797 " daemon logs" % errorstring)
798 device.SetSyncSpeed(constants.SYNC_SPEED)
799 if on_primary or disk.OpenOnSecondary():
800 device.Open(force=True)
801 DevCacheManager.UpdateCache(device.dev_path, owner,
802 on_primary, disk.iv_name)
806 physical_id = device.unique_id
810 def RemoveBlockDevice(disk):
811 """Remove a block device.
813 This is intended to be called recursively.
817 # since we are removing the device, allow a partial match
818 # this allows removal of broken mirrors
819 rdev = _RecursiveFindBD(disk, allow_partial=True)
820 except errors.BlockDeviceError, err:
821 # probably can't attach
822 logging.info("Can't attach to device %s in remove", disk)
825 r_path = rdev.dev_path
826 result = rdev.Remove()
828 DevCacheManager.RemoveCache(r_path)
832 for child in disk.children:
833 result = result and RemoveBlockDevice(child)
837 def _RecursiveAssembleBD(disk, owner, as_primary):
838 """Activate a block device for an instance.
840 This is run on the primary and secondary nodes for an instance.
842 This function is called recursively.
845 disk: a objects.Disk object
846 as_primary: if we should make the block device read/write
849 the assembled device or None (in case no device was assembled)
851 If the assembly is not successful, an exception is raised.
856 mcn = disk.ChildrenNeeded()
858 mcn = 0 # max number of Nones allowed
860 mcn = len(disk.children) - mcn # max number of Nones
861 for chld_disk in disk.children:
863 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
864 except errors.BlockDeviceError, err:
865 if children.count(None) >= mcn:
868 logging.debug("Error in child activation: %s", str(err))
869 children.append(cdev)
871 if as_primary or disk.AssembleOnSecondary():
872 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
873 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
875 if as_primary or disk.OpenOnSecondary():
877 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
878 as_primary, disk.iv_name)
885 def AssembleBlockDevice(disk, owner, as_primary):
886 """Activate a block device for an instance.
888 This is a wrapper over _RecursiveAssembleBD.
891 a /dev path for primary nodes
892 True for secondary nodes
895 result = _RecursiveAssembleBD(disk, owner, as_primary)
896 if isinstance(result, bdev.BlockDev):
897 result = result.dev_path
901 def ShutdownBlockDevice(disk):
902 """Shut down a block device.
904 First, if the device is assembled (can `Attach()`), then the device
905 is shutdown. Then the children of the device are shutdown.
907 This function is called recursively. Note that we don't cache the
908 children or such, as oppossed to assemble, shutdown of different
909 devices doesn't require that the upper device was active.
912 r_dev = _RecursiveFindBD(disk)
913 if r_dev is not None:
914 r_path = r_dev.dev_path
915 result = r_dev.Shutdown()
917 DevCacheManager.RemoveCache(r_path)
921 for child in disk.children:
922 result = result and ShutdownBlockDevice(child)
926 def MirrorAddChildren(parent_cdev, new_cdevs):
927 """Extend a mirrored block device.
930 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
931 if parent_bdev is None:
932 logging.error("Can't find parent device")
934 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
935 if new_bdevs.count(None) > 0:
936 logging.error("Can't find new device(s) to add: %s:%s",
937 new_bdevs, new_cdevs)
939 parent_bdev.AddChildren(new_bdevs)
943 def MirrorRemoveChildren(parent_cdev, new_cdevs):
944 """Shrink a mirrored block device.
947 parent_bdev = _RecursiveFindBD(parent_cdev)
948 if parent_bdev is None:
949 logging.error("Can't find parent in remove children: %s", parent_cdev)
952 for disk in new_cdevs:
953 rpath = disk.StaticDevPath()
955 bd = _RecursiveFindBD(disk)
957 logging.error("Can't find dynamic device %s while removing children",
961 devs.append(bd.dev_path)
964 parent_bdev.RemoveChildren(devs)
968 def GetMirrorStatus(disks):
969 """Get the mirroring status of a list of devices.
972 disks: list of `objects.Disk`
975 list of (mirror_done, estimated_time) tuples, which
976 are the result of bdev.BlockDevice.CombinedSyncStatus()
981 rbd = _RecursiveFindBD(dsk)
983 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
984 stats.append(rbd.CombinedSyncStatus())
988 def _RecursiveFindBD(disk, allow_partial=False):
989 """Check if a device is activated.
991 If so, return informations about the real device.
994 disk: the objects.Disk instance
995 allow_partial: don't abort the find if a child of the
996 device can't be found; this is intended to be
997 used when repairing mirrors
1000 None if the device can't be found
1001 otherwise the device instance
1006 for chdisk in disk.children:
1007 children.append(_RecursiveFindBD(chdisk))
1009 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1012 def FindBlockDevice(disk):
1013 """Check if a device is activated.
1015 If so, return informations about the real device.
1018 disk: the objects.Disk instance
1020 None if the device can't be found
1021 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1024 rbd = _RecursiveFindBD(disk)
1027 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1029 def _IsJobQueueFile(file_name):
1030 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1031 return os.path.commonprefix([queue_dir, file_name]) == queue_dir
1033 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1034 """Write a file to the filesystem.
1036 This allows the master to overwrite(!) a file. It will only perform
1037 the operation if the file belongs to a list of configuration files.
1040 if not os.path.isabs(file_name):
1041 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1046 constants.CLUSTER_CONF_FILE,
1047 constants.ETC_HOSTS,
1048 constants.SSH_KNOWN_HOSTS_FILE,
1049 constants.VNC_PASSWORD_FILE,
1051 allowed_files.extend(ssconf.SimpleStore().GetFileList())
1053 if not (file_name in allowed_files or _IsJobQueueFile(file_name)):
1054 logging.error("Filename passed to UploadFile not in allowed"
1055 " upload targets: '%s'", file_name)
1058 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1059 atime=atime, mtime=mtime)
1063 def _ErrnoOrStr(err):
1064 """Format an EnvironmentError exception.
1066 If the `err` argument has an errno attribute, it will be looked up
1067 and converted into a textual EXXXX description. Otherwise the string
1068 representation of the error will be returned.
1071 if hasattr(err, 'errno'):
1072 detail = errno.errorcode[err.errno]
1078 def _OSOndiskVersion(name, os_dir):
1079 """Compute and return the API version of a given OS.
1081 This function will try to read the API version of the os given by
1082 the 'name' parameter and residing in the 'os_dir' directory.
1084 Return value will be either an integer denoting the version or None in the
1085 case when this is not a valid OS name.
1088 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1091 st = os.stat(api_file)
1092 except EnvironmentError, err:
1093 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1094 " found (%s)" % _ErrnoOrStr(err))
1096 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1097 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1103 api_version = f.read(256)
1106 except EnvironmentError, err:
1107 raise errors.InvalidOS(name, os_dir, "error while reading the"
1108 " API version (%s)" % _ErrnoOrStr(err))
1110 api_version = api_version.strip()
1112 api_version = int(api_version)
1113 except (TypeError, ValueError), err:
1114 raise errors.InvalidOS(name, os_dir,
1115 "API version is not integer (%s)" % str(err))
1120 def DiagnoseOS(top_dirs=None):
1121 """Compute the validity for all OSes.
1123 Returns an OS object for each name in all the given top directories
1124 (if not given defaults to constants.OS_SEARCH_PATH)
1130 if top_dirs is None:
1131 top_dirs = constants.OS_SEARCH_PATH
1134 for dir_name in top_dirs:
1135 if os.path.isdir(dir_name):
1137 f_names = utils.ListVisibleFiles(dir_name)
1138 except EnvironmentError, err:
1139 logging.exception("Can't list the OS directory %s", dir_name)
1141 for name in f_names:
1143 os_inst = OSFromDisk(name, base_dir=dir_name)
1144 result.append(os_inst)
1145 except errors.InvalidOS, err:
1146 result.append(objects.OS.FromInvalidOS(err))
1151 def OSFromDisk(name, base_dir=None):
1152 """Create an OS instance from disk.
1154 This function will return an OS instance if the given name is a
1155 valid OS name. Otherwise, it will raise an appropriate
1156 `errors.InvalidOS` exception, detailing why this is not a valid
1160 os_dir: Directory containing the OS scripts. Defaults to a search
1161 in all the OS_SEARCH_PATH directories.
1165 if base_dir is None:
1166 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1168 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1170 os_dir = os.path.sep.join([base_dir, name])
1172 api_version = _OSOndiskVersion(name, os_dir)
1174 if api_version != constants.OS_API_VERSION:
1175 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1176 " (found %s want %s)"
1177 % (api_version, constants.OS_API_VERSION))
1179 # OS Scripts dictionary, we will populate it with the actual script names
1180 os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1182 for script in os_scripts:
1183 os_scripts[script] = os.path.sep.join([os_dir, script])
1186 st = os.stat(os_scripts[script])
1187 except EnvironmentError, err:
1188 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1189 (script, _ErrnoOrStr(err)))
1191 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1192 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1195 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1196 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1200 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1201 create_script=os_scripts['create'],
1202 export_script=os_scripts['export'],
1203 import_script=os_scripts['import'],
1204 rename_script=os_scripts['rename'],
1205 api_version=api_version)
1208 def GrowBlockDevice(disk, amount):
1209 """Grow a stack of block devices.
1211 This function is called recursively, with the childrens being the
1215 disk: the disk to be grown
1217 Returns: a tuple of (status, result), with:
1218 status: the result (true/false) of the operation
1219 result: the error message if the operation failed, otherwise not used
1222 r_dev = _RecursiveFindBD(disk)
1224 return False, "Cannot find block device %s" % (disk,)
1228 except errors.BlockDeviceError, err:
1229 return False, str(err)
1234 def SnapshotBlockDevice(disk):
1235 """Create a snapshot copy of a block device.
1237 This function is called recursively, and the snapshot is actually created
1238 just for the leaf lvm backend device.
1241 disk: the disk to be snapshotted
1244 a config entry for the actual lvm device snapshotted.
1248 if len(disk.children) == 1:
1249 # only one child, let's recurse on it
1250 return SnapshotBlockDevice(disk.children[0])
1252 # more than one child, choose one that matches
1253 for child in disk.children:
1254 if child.size == disk.size:
1255 # return implies breaking the loop
1256 return SnapshotBlockDevice(child)
1257 elif disk.dev_type == constants.LD_LV:
1258 r_dev = _RecursiveFindBD(disk)
1259 if r_dev is not None:
1260 # let's stay on the safe side and ask for the full size, for now
1261 return r_dev.Snapshot(disk.size)
1265 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1266 " '%s' of type '%s'" %
1267 (disk.unique_id, disk.dev_type))
1270 def ExportSnapshot(disk, dest_node, instance):
1271 """Export a block device snapshot to a remote node.
1274 disk: the snapshot block device
1275 dest_node: the node to send the image to
1276 instance: instance being exported
1279 True if successful, False otherwise.
1282 inst_os = OSFromDisk(instance.os)
1283 export_script = inst_os.export_script
1285 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1286 instance.name, int(time.time()))
1287 if not os.path.exists(constants.LOG_OS_DIR):
1288 os.mkdir(constants.LOG_OS_DIR, 0750)
1290 real_os_dev = _RecursiveFindBD(disk)
1291 if real_os_dev is None:
1292 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1296 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1297 destfile = disk.physical_id[1]
1299 # the target command is built out of three individual commands,
1300 # which are joined by pipes; we check each individual command for
1303 expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1304 export_script, instance.name,
1305 real_os_dev.dev_path, logfile)
1309 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1310 destdir, destdir, destfile)
1311 remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1314 # all commands have been checked, so we're safe to combine them
1315 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1317 result = utils.RunCmd(command)
1320 logging.error("os snapshot export command '%s' returned error: %s"
1321 " output: %s", command, result.fail_reason, result.output)
1327 def FinalizeExport(instance, snap_disks):
1328 """Write out the export configuration information.
1331 instance: instance configuration
1332 snap_disks: snapshot block devices
1335 False in case of error, True otherwise.
1338 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1339 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1341 config = objects.SerializableConfigParser()
1343 config.add_section(constants.INISECT_EXP)
1344 config.set(constants.INISECT_EXP, 'version', '0')
1345 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1346 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1347 config.set(constants.INISECT_EXP, 'os', instance.os)
1348 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1350 config.add_section(constants.INISECT_INS)
1351 config.set(constants.INISECT_INS, 'name', instance.name)
1352 config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1353 config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1354 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1357 for nic_count, nic in enumerate(instance.nics):
1358 config.set(constants.INISECT_INS, 'nic%d_mac' %
1359 nic_count, '%s' % nic.mac)
1360 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1361 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1363 # TODO: redundant: on load can read nics until it doesn't exist
1364 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1367 for disk_count, disk in enumerate(snap_disks):
1368 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1369 ('%s' % disk.iv_name))
1370 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1371 ('%s' % disk.physical_id[1]))
1372 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1374 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1376 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1377 cfo = open(cff, 'w')
1383 shutil.rmtree(finaldestdir, True)
1384 shutil.move(destdir, finaldestdir)
1389 def ExportInfo(dest):
1390 """Get export configuration information.
1393 dest: directory containing the export
1396 A serializable config file containing the export info.
1399 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1401 config = objects.SerializableConfigParser()
1404 if (not config.has_section(constants.INISECT_EXP) or
1405 not config.has_section(constants.INISECT_INS)):
1411 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1412 """Import an os image into an instance.
1415 instance: the instance object
1416 os_disk: the instance-visible name of the os device
1417 swap_disk: the instance-visible name of the swap device
1418 src_node: node holding the source image
1419 src_image: path to the source image on src_node
1422 False in case of error, True otherwise.
1425 inst_os = OSFromDisk(instance.os)
1426 import_script = inst_os.import_script
1428 os_device = instance.FindDisk(os_disk)
1429 if os_device is None:
1430 logging.error("Can't find this device-visible name '%s'", os_disk)
1433 swap_device = instance.FindDisk(swap_disk)
1434 if swap_device is None:
1435 logging.error("Can't find this device-visible name '%s'", swap_disk)
1438 real_os_dev = _RecursiveFindBD(os_device)
1439 if real_os_dev is None:
1440 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1444 real_swap_dev = _RecursiveFindBD(swap_device)
1445 if real_swap_dev is None:
1446 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1448 real_swap_dev.Open()
1450 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1451 instance.name, int(time.time()))
1452 if not os.path.exists(constants.LOG_OS_DIR):
1453 os.mkdir(constants.LOG_OS_DIR, 0750)
1455 destcmd = utils.BuildShellCmd('cat %s', src_image)
1456 remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1460 impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1461 inst_os.path, import_script, instance.name,
1462 real_os_dev.dev_path, real_swap_dev.dev_path,
1465 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1467 result = utils.RunCmd(command)
1470 logging.error("os import command '%s' returned error: %s"
1471 " output: %s", command, result.fail_reason, result.output)
1478 """Return a list of exports currently available on this machine.
1481 if os.path.isdir(constants.EXPORT_DIR):
1482 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1487 def RemoveExport(export):
1488 """Remove an existing export from the node.
1491 export: the name of the export to remove
1494 False in case of error, True otherwise.
1497 target = os.path.join(constants.EXPORT_DIR, export)
1499 shutil.rmtree(target)
1500 # TODO: catch some of the relevant exceptions and provide a pretty
1501 # error message if rmtree fails.
1506 def RenameBlockDevices(devlist):
1507 """Rename a list of block devices.
1509 The devlist argument is a list of tuples (disk, new_logical,
1510 new_physical). The return value will be a combined boolean result
1511 (True only if all renames succeeded).
1515 for disk, unique_id in devlist:
1516 dev = _RecursiveFindBD(disk)
1521 old_rpath = dev.dev_path
1522 dev.Rename(unique_id)
1523 new_rpath = dev.dev_path
1524 if old_rpath != new_rpath:
1525 DevCacheManager.RemoveCache(old_rpath)
1526 # FIXME: we should add the new cache information here, like:
1527 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1528 # but we don't have the owner here - maybe parse from existing
1529 # cache? for now, we only lose lvm data when we rename, which
1530 # is less critical than DRBD or MD
1531 except errors.BlockDeviceError, err:
1532 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1537 def _TransformFileStorageDir(file_storage_dir):
1538 """Checks whether given file_storage_dir is valid.
1540 Checks wheter the given file_storage_dir is within the cluster-wide
1541 default file_storage_dir stored in SimpleStore. Only paths under that
1542 directory are allowed.
1545 file_storage_dir: string with path
1548 normalized file_storage_dir (string) if valid, None otherwise
1551 file_storage_dir = os.path.normpath(file_storage_dir)
1552 base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1553 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1554 base_file_storage_dir):
1555 logging.error("file storage directory '%s' is not under base file"
1556 " storage directory '%s'",
1557 file_storage_dir, base_file_storage_dir)
1559 return file_storage_dir
1562 def CreateFileStorageDir(file_storage_dir):
1563 """Create file storage directory.
1566 file_storage_dir: string containing the path
1569 tuple with first element a boolean indicating wheter dir
1570 creation was successful or not
1573 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1575 if not file_storage_dir:
1578 if os.path.exists(file_storage_dir):
1579 if not os.path.isdir(file_storage_dir):
1580 logging.error("'%s' is not a directory", file_storage_dir)
1584 os.makedirs(file_storage_dir, 0750)
1585 except OSError, err:
1586 logging.error("Cannot create file storage directory '%s': %s",
1587 file_storage_dir, err)
1592 def RemoveFileStorageDir(file_storage_dir):
1593 """Remove file storage directory.
1595 Remove it only if it's empty. If not log an error and return.
1598 file_storage_dir: string containing the path
1601 tuple with first element a boolean indicating wheter dir
1602 removal was successful or not
1605 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1607 if not file_storage_dir:
1610 if os.path.exists(file_storage_dir):
1611 if not os.path.isdir(file_storage_dir):
1612 logging.error("'%s' is not a directory", file_storage_dir)
1614 # deletes dir only if empty, otherwise we want to return False
1616 os.rmdir(file_storage_dir)
1617 except OSError, err:
1618 logging.exception("Cannot remove file storage directory '%s'",
1624 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1625 """Rename the file storage directory.
1628 old_file_storage_dir: string containing the old path
1629 new_file_storage_dir: string containing the new path
1632 tuple with first element a boolean indicating wheter dir
1633 rename was successful or not
1636 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1637 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1639 if not old_file_storage_dir or not new_file_storage_dir:
1642 if not os.path.exists(new_file_storage_dir):
1643 if os.path.isdir(old_file_storage_dir):
1645 os.rename(old_file_storage_dir, new_file_storage_dir)
1646 except OSError, err:
1647 logging.exception("Cannot rename '%s' to '%s'",
1648 old_file_storage_dir, new_file_storage_dir)
1651 logging.error("'%s' is not a directory", old_file_storage_dir)
1654 if os.path.exists(old_file_storage_dir):
1655 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1656 old_file_storage_dir, new_file_storage_dir)
1661 def JobQueuePurge():
1662 """Removes job queue files and archived jobs
1665 _CleanDirectory(constants.QUEUE_DIR)
1666 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
1669 def CloseBlockDevices(disks):
1670 """Closes the given block devices.
1672 This means they will be switched to secondary mode (in case of DRBD).
1677 rd = _RecursiveFindBD(cf)
1679 return (False, "Can't find device %s" % cf)
1686 except errors.BlockDeviceError, err:
1687 msg.append(str(err))
1689 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1691 return (True, "All devices secondary")
1694 class HooksRunner(object):
1697 This class is instantiated on the node side (ganeti-noded) and not on
1701 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1703 def __init__(self, hooks_base_dir=None):
1704 """Constructor for hooks runner.
1707 - hooks_base_dir: if not None, this overrides the
1708 constants.HOOKS_BASE_DIR (useful for unittests)
1711 if hooks_base_dir is None:
1712 hooks_base_dir = constants.HOOKS_BASE_DIR
1713 self._BASE_DIR = hooks_base_dir
1716 def ExecHook(script, env):
1717 """Exec one hook script.
1720 - script: the full path to the script
1721 - env: the environment with which to exec the script
1724 # exec the process using subprocess and log the output
1727 fdstdin = open("/dev/null", "r")
1728 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1729 stderr=subprocess.STDOUT, close_fds=True,
1730 shell=False, cwd="/", env=env)
1733 output = child.stdout.read(4096)
1734 child.stdout.close()
1735 except EnvironmentError, err:
1736 output += "Hook script error: %s" % str(err)
1740 result = child.wait()
1742 except EnvironmentError, err:
1743 if err.errno == errno.EINTR:
1747 # try not to leak fds
1748 for fd in (fdstdin, ):
1752 except EnvironmentError, err:
1753 # just log the error
1754 #logging.exception("Error while closing fd %s", fd)
1757 return result == 0, output
1759 def RunHooks(self, hpath, phase, env):
1760 """Run the scripts in the hooks directory.
1762 This method will not be usually overriden by child opcodes.
1765 if phase == constants.HOOKS_PHASE_PRE:
1767 elif phase == constants.HOOKS_PHASE_POST:
1770 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1773 subdir = "%s-%s.d" % (hpath, suffix)
1774 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1776 dir_contents = utils.ListVisibleFiles(dir_name)
1777 except OSError, err:
1781 # we use the standard python sort order,
1782 # so 00name is the recommended naming scheme
1784 for relname in dir_contents:
1785 fname = os.path.join(dir_name, relname)
1786 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1787 self.RE_MASK.match(relname) is not None):
1788 rrval = constants.HKR_SKIP
1791 result, output = self.ExecHook(fname, env)
1793 rrval = constants.HKR_FAIL
1795 rrval = constants.HKR_SUCCESS
1796 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1801 class IAllocatorRunner(object):
1802 """IAllocator runner.
1804 This class is instantiated on the node side (ganeti-noded) and not on
1808 def Run(self, name, idata):
1809 """Run an iallocator script.
1811 Return value: tuple of:
1812 - run status (one of the IARUN_ constants)
1815 - fail reason (as from utils.RunResult)
1818 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1820 if alloc_script is None:
1821 return (constants.IARUN_NOTFOUND, None, None, None)
1823 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1827 result = utils.RunCmd([alloc_script, fin_name])
1829 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1834 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1837 class DevCacheManager(object):
1838 """Simple class for managing a cache of block device information.
1841 _DEV_PREFIX = "/dev/"
1842 _ROOT_DIR = constants.BDEV_CACHE_DIR
1845 def _ConvertPath(cls, dev_path):
1846 """Converts a /dev/name path to the cache file name.
1848 This replaces slashes with underscores and strips the /dev
1849 prefix. It then returns the full path to the cache file
1852 if dev_path.startswith(cls._DEV_PREFIX):
1853 dev_path = dev_path[len(cls._DEV_PREFIX):]
1854 dev_path = dev_path.replace("/", "_")
1855 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1859 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1860 """Updates the cache information for a given device.
1863 if dev_path is None:
1864 logging.error("DevCacheManager.UpdateCache got a None dev_path")
1866 fpath = cls._ConvertPath(dev_path)
1872 iv_name = "not_visible"
1873 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1875 utils.WriteFile(fpath, data=fdata)
1876 except EnvironmentError, err:
1877 logging.exception("Can't update bdev cache for %s", dev_path)
1880 def RemoveCache(cls, dev_path):
1881 """Remove data for a dev_path.
1884 if dev_path is None:
1885 logging.error("DevCacheManager.RemoveCache got a None dev_path")
1887 fpath = cls._ConvertPath(dev_path)
1889 utils.RemoveFile(fpath)
1890 except EnvironmentError, err:
1891 logging.exception("Can't update bdev cache for %s", dev_path)