Move function cleaning directory to module level
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetSshRunner():
48   return ssh.SshRunner()
49
50
51 def _CleanDirectory(path):
52   if not os.path.isdir(path):
53     return
54   for rel_name in utils.ListVisibleFiles(path):
55     full_name = os.path.join(path, rel_name)
56     if os.path.isfile(full_name) and not os.path.islink(full_name):
57       utils.RemoveFile(full_name)
58
59
60 def _GetMasterInfo():
61   """Return the master ip and netdev.
62
63   """
64   try:
65     ss = ssconf.SimpleStore()
66     master_netdev = ss.GetMasterNetdev()
67     master_ip = ss.GetMasterIP()
68   except errors.ConfigurationError, err:
69     logging.exception("Cluster configuration incomplete")
70     return (None, None)
71   return (master_netdev, master_ip)
72
73
74 def StartMaster(start_daemons):
75   """Activate local node as master node.
76
77   The function will always try activate the IP address of the master
78   (if someone else has it, then it won't). Then, if the start_daemons
79   parameter is True, it will also start the master daemons
80   (ganet-masterd and ganeti-rapi).
81
82   """
83   ok = True
84   master_netdev, master_ip = _GetMasterInfo()
85   if not master_netdev:
86     return False
87
88   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
89     if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
90                      source=constants.LOCALHOST_IP_ADDRESS):
91       # we already have the ip:
92       logging.debug("Already started")
93     else:
94       logging.error("Someone else has the master ip, not activating")
95       ok = False
96   else:
97     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
98                            "dev", master_netdev, "label",
99                            "%s:0" % master_netdev])
100     if result.failed:
101       logging.error("Can't activate master IP: %s", result.output)
102       ok = False
103
104     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
105                            "-s", master_ip, master_ip])
106     # we'll ignore the exit code of arping
107
108   # and now start the master and rapi daemons
109   if start_daemons:
110     for daemon in 'ganeti-masterd', 'ganeti-rapi':
111       result = utils.RunCmd([daemon])
112       if result.failed:
113         logging.error("Can't start daemon %s: %s", daemon, result.output)
114         ok = False
115   return ok
116
117
118 def StopMaster(stop_daemons):
119   """Deactivate this node as master.
120
121   The function will always try to deactivate the IP address of the
122   master. Then, if the stop_daemons parameter is True, it will also
123   stop the master daemons (ganet-masterd and ganeti-rapi).
124
125   """
126   master_netdev, master_ip = _GetMasterInfo()
127   if not master_netdev:
128     return False
129
130   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
131                          "dev", master_netdev])
132   if result.failed:
133     logging.error("Can't remove the master IP, error: %s", result.output)
134     # but otherwise ignore the failure
135
136   if stop_daemons:
137     # stop/kill the rapi and the master daemon
138     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
139       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
140
141   return True
142
143
144 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
145   """Joins this node to the cluster.
146
147   This does the following:
148       - updates the hostkeys of the machine (rsa and dsa)
149       - adds the ssh private key to the user
150       - adds the ssh public key to the users' authorized_keys file
151
152   """
153   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
154                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
155                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
156                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
157   for name, content, mode in sshd_keys:
158     utils.WriteFile(name, data=content, mode=mode)
159
160   try:
161     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
162                                                     mkdir=True)
163   except errors.OpExecError, err:
164     logging.exception("Error while processing user ssh files")
165     return False
166
167   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
168     utils.WriteFile(name, data=content, mode=0600)
169
170   utils.AddAuthorizedKey(auth_keys, sshpub)
171
172   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
173
174   return True
175
176
177 def LeaveCluster():
178   """Cleans up the current node and prepares it to be removed from the cluster.
179
180   """
181   _CleanDirectory(constants.DATA_DIR)
182
183   JobQueuePurge()
184
185   try:
186     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
187   except errors.OpExecError:
188     logging.exception("Error while processing ssh files")
189     return
190
191   f = open(pub_key, 'r')
192   try:
193     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
194   finally:
195     f.close()
196
197   utils.RemoveFile(priv_key)
198   utils.RemoveFile(pub_key)
199
200   # Return a reassuring string to the caller, and quit
201   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
202
203
204 def GetNodeInfo(vgname):
205   """Gives back a hash with different informations about the node.
206
207   Returns:
208     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
209       'memory_free' : xxx, 'memory_total' : xxx }
210     where
211     vg_size is the size of the configured volume group in MiB
212     vg_free is the free size of the volume group in MiB
213     memory_dom0 is the memory allocated for domain0 in MiB
214     memory_free is the currently available (free) ram in MiB
215     memory_total is the total number of ram in MiB
216
217   """
218   outputarray = {}
219   vginfo = _GetVGInfo(vgname)
220   outputarray['vg_size'] = vginfo['vg_size']
221   outputarray['vg_free'] = vginfo['vg_free']
222
223   hyper = hypervisor.GetHypervisor()
224   hyp_info = hyper.GetNodeInfo()
225   if hyp_info is not None:
226     outputarray.update(hyp_info)
227
228   f = open("/proc/sys/kernel/random/boot_id", 'r')
229   try:
230     outputarray["bootid"] = f.read(128).rstrip("\n")
231   finally:
232     f.close()
233
234   return outputarray
235
236
237 def VerifyNode(what):
238   """Verify the status of the local node.
239
240   Args:
241     what - a dictionary of things to check:
242       'filelist' : list of files for which to compute checksums
243       'nodelist' : list of nodes we should check communication with
244       'hypervisor': run the hypervisor-specific verify
245
246   Requested files on local node are checksummed and the result returned.
247
248   The nodelist is traversed, with the following checks being made
249   for each node:
250   - known_hosts key correct
251   - correct resolving of node name (target node returns its own hostname
252     by ssh-execution of 'hostname', result compared against name in list.
253
254   """
255   result = {}
256
257   if 'hypervisor' in what:
258     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
259
260   if 'filelist' in what:
261     result['filelist'] = utils.FingerprintFiles(what['filelist'])
262
263   if 'nodelist' in what:
264     result['nodelist'] = {}
265     random.shuffle(what['nodelist'])
266     for node in what['nodelist']:
267       success, message = _GetSshRunner().VerifyNodeHostname(node)
268       if not success:
269         result['nodelist'][node] = message
270   if 'node-net-test' in what:
271     result['node-net-test'] = {}
272     my_name = utils.HostInfo().name
273     my_pip = my_sip = None
274     for name, pip, sip in what['node-net-test']:
275       if name == my_name:
276         my_pip = pip
277         my_sip = sip
278         break
279     if not my_pip:
280       result['node-net-test'][my_name] = ("Can't find my own"
281                                           " primary/secondary IP"
282                                           " in the node list")
283     else:
284       port = ssconf.SimpleStore().GetNodeDaemonPort()
285       for name, pip, sip in what['node-net-test']:
286         fail = []
287         if not utils.TcpPing(pip, port, source=my_pip):
288           fail.append("primary")
289         if sip != pip:
290           if not utils.TcpPing(sip, port, source=my_sip):
291             fail.append("secondary")
292         if fail:
293           result['node-net-test'][name] = ("failure using the %s"
294                                            " interface(s)" %
295                                            " and ".join(fail))
296
297   return result
298
299
300 def GetVolumeList(vg_name):
301   """Compute list of logical volumes and their size.
302
303   Returns:
304     dictionary of all partions (key) with their size (in MiB), inactive
305     and online status:
306     {'test1': ('20.06', True, True)}
307
308   """
309   lvs = {}
310   sep = '|'
311   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
312                          "--separator=%s" % sep,
313                          "-olv_name,lv_size,lv_attr", vg_name])
314   if result.failed:
315     logging.error("Failed to list logical volumes, lvs output: %s",
316                   result.output)
317     return result.output
318
319   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
320   for line in result.stdout.splitlines():
321     line = line.strip()
322     match = valid_line_re.match(line)
323     if not match:
324       logging.error("Invalid line returned from lvs output: '%s'", line)
325       continue
326     name, size, attr = match.groups()
327     inactive = attr[4] == '-'
328     online = attr[5] == 'o'
329     lvs[name] = (size, inactive, online)
330
331   return lvs
332
333
334 def ListVolumeGroups():
335   """List the volume groups and their size.
336
337   Returns:
338     Dictionary with keys volume name and values the size of the volume
339
340   """
341   return utils.ListVolumeGroups()
342
343
344 def NodeVolumes():
345   """List all volumes on this node.
346
347   """
348   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
349                          "--separator=|",
350                          "--options=lv_name,lv_size,devices,vg_name"])
351   if result.failed:
352     logging.error("Failed to list logical volumes, lvs output: %s",
353                   result.output)
354     return {}
355
356   def parse_dev(dev):
357     if '(' in dev:
358       return dev.split('(')[0]
359     else:
360       return dev
361
362   def map_line(line):
363     return {
364       'name': line[0].strip(),
365       'size': line[1].strip(),
366       'dev': parse_dev(line[2].strip()),
367       'vg': line[3].strip(),
368     }
369
370   return [map_line(line.split('|')) for line in result.stdout.splitlines()
371           if line.count('|') >= 3]
372
373
374 def BridgesExist(bridges_list):
375   """Check if a list of bridges exist on the current node.
376
377   Returns:
378     True if all of them exist, false otherwise
379
380   """
381   for bridge in bridges_list:
382     if not utils.BridgeExists(bridge):
383       return False
384
385   return True
386
387
388 def GetInstanceList():
389   """Provides a list of instances.
390
391   Returns:
392     A list of all running instances on the current node
393     - instance1.example.com
394     - instance2.example.com
395
396   """
397   try:
398     names = hypervisor.GetHypervisor().ListInstances()
399   except errors.HypervisorError, err:
400     logging.exception("Error enumerating instances")
401     raise
402
403   return names
404
405
406 def GetInstanceInfo(instance):
407   """Gives back the informations about an instance as a dictionary.
408
409   Args:
410     instance: name of the instance (ex. instance1.example.com)
411
412   Returns:
413     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
414     where
415     memory: memory size of instance (int)
416     state: xen state of instance (string)
417     time: cpu time of instance (float)
418
419   """
420   output = {}
421
422   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
423   if iinfo is not None:
424     output['memory'] = iinfo[2]
425     output['state'] = iinfo[4]
426     output['time'] = iinfo[5]
427
428   return output
429
430
431 def GetAllInstancesInfo():
432   """Gather data about all instances.
433
434   This is the equivalent of `GetInstanceInfo()`, except that it
435   computes data for all instances at once, thus being faster if one
436   needs data about more than one instance.
437
438   Returns: a dictionary of dictionaries, keys being the instance name,
439     and with values:
440     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
441     where
442     memory: memory size of instance (int)
443     state: xen state of instance (string)
444     time: cpu time of instance (float)
445     vcpus: the number of cpus
446
447   """
448   output = {}
449
450   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
451   if iinfo:
452     for name, inst_id, memory, vcpus, state, times in iinfo:
453       output[name] = {
454         'memory': memory,
455         'vcpus': vcpus,
456         'state': state,
457         'time': times,
458         }
459
460   return output
461
462
463 def AddOSToInstance(instance, os_disk, swap_disk):
464   """Add an OS to an instance.
465
466   Args:
467     instance: the instance object
468     os_disk: the instance-visible name of the os device
469     swap_disk: the instance-visible name of the swap device
470
471   """
472   inst_os = OSFromDisk(instance.os)
473
474   create_script = inst_os.create_script
475
476   os_device = instance.FindDisk(os_disk)
477   if os_device is None:
478     logging.error("Can't find this device-visible name '%s'", os_disk)
479     return False
480
481   swap_device = instance.FindDisk(swap_disk)
482   if swap_device is None:
483     logging.error("Can't find this device-visible name '%s'", swap_disk)
484     return False
485
486   real_os_dev = _RecursiveFindBD(os_device)
487   if real_os_dev is None:
488     raise errors.BlockDeviceError("Block device '%s' is not set up" %
489                                   str(os_device))
490   real_os_dev.Open()
491
492   real_swap_dev = _RecursiveFindBD(swap_device)
493   if real_swap_dev is None:
494     raise errors.BlockDeviceError("Block device '%s' is not set up" %
495                                   str(swap_device))
496   real_swap_dev.Open()
497
498   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
499                                      instance.name, int(time.time()))
500   if not os.path.exists(constants.LOG_OS_DIR):
501     os.mkdir(constants.LOG_OS_DIR, 0750)
502
503   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
504                                 inst_os.path, create_script, instance.name,
505                                 real_os_dev.dev_path, real_swap_dev.dev_path,
506                                 logfile)
507
508   result = utils.RunCmd(command)
509   if result.failed:
510     logging.error("os create command '%s' returned error: %s, logfile: %s,"
511                   " output: %s", command, result.fail_reason, logfile,
512                   result.output)
513     return False
514
515   return True
516
517
518 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
519   """Run the OS rename script for an instance.
520
521   Args:
522     instance: the instance object
523     old_name: the old name of the instance
524     os_disk: the instance-visible name of the os device
525     swap_disk: the instance-visible name of the swap device
526
527   """
528   inst_os = OSFromDisk(instance.os)
529
530   script = inst_os.rename_script
531
532   os_device = instance.FindDisk(os_disk)
533   if os_device is None:
534     logging.error("Can't find this device-visible name '%s'", os_disk)
535     return False
536
537   swap_device = instance.FindDisk(swap_disk)
538   if swap_device is None:
539     logging.error("Can't find this device-visible name '%s'", swap_disk)
540     return False
541
542   real_os_dev = _RecursiveFindBD(os_device)
543   if real_os_dev is None:
544     raise errors.BlockDeviceError("Block device '%s' is not set up" %
545                                   str(os_device))
546   real_os_dev.Open()
547
548   real_swap_dev = _RecursiveFindBD(swap_device)
549   if real_swap_dev is None:
550     raise errors.BlockDeviceError("Block device '%s' is not set up" %
551                                   str(swap_device))
552   real_swap_dev.Open()
553
554   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555                                            old_name,
556                                            instance.name, int(time.time()))
557   if not os.path.exists(constants.LOG_OS_DIR):
558     os.mkdir(constants.LOG_OS_DIR, 0750)
559
560   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
561                                 inst_os.path, script, old_name, instance.name,
562                                 real_os_dev.dev_path, real_swap_dev.dev_path,
563                                 logfile)
564
565   result = utils.RunCmd(command)
566
567   if result.failed:
568     logging.error("os create command '%s' returned error: %s output: %s",
569                   command, result.fail_reason, result.output)
570     return False
571
572   return True
573
574
575 def _GetVGInfo(vg_name):
576   """Get informations about the volume group.
577
578   Args:
579     vg_name: the volume group
580
581   Returns:
582     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
583     where
584     vg_size is the total size of the volume group in MiB
585     vg_free is the free size of the volume group in MiB
586     pv_count are the number of physical disks in that vg
587
588   If an error occurs during gathering of data, we return the same dict
589   with keys all set to None.
590
591   """
592   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
593
594   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
595                          "--nosuffix", "--units=m", "--separator=:", vg_name])
596
597   if retval.failed:
598     logging.error("volume group %s not present", vg_name)
599     return retdic
600   valarr = retval.stdout.strip().rstrip(':').split(':')
601   if len(valarr) == 3:
602     try:
603       retdic = {
604         "vg_size": int(round(float(valarr[0]), 0)),
605         "vg_free": int(round(float(valarr[1]), 0)),
606         "pv_count": int(valarr[2]),
607         }
608     except ValueError, err:
609       logging.exception("Fail to parse vgs output")
610   else:
611     logging.error("vgs output has the wrong number of fields (expected"
612                   " three): %s", str(valarr))
613   return retdic
614
615
616 def _GatherBlockDevs(instance):
617   """Set up an instance's block device(s).
618
619   This is run on the primary node at instance startup. The block
620   devices must be already assembled.
621
622   """
623   block_devices = []
624   for disk in instance.disks:
625     device = _RecursiveFindBD(disk)
626     if device is None:
627       raise errors.BlockDeviceError("Block device '%s' is not set up." %
628                                     str(disk))
629     device.Open()
630     block_devices.append((disk, device))
631   return block_devices
632
633
634 def StartInstance(instance, extra_args):
635   """Start an instance.
636
637   Args:
638     instance - name of instance to start.
639
640   """
641   running_instances = GetInstanceList()
642
643   if instance.name in running_instances:
644     return True
645
646   block_devices = _GatherBlockDevs(instance)
647   hyper = hypervisor.GetHypervisor()
648
649   try:
650     hyper.StartInstance(instance, block_devices, extra_args)
651   except errors.HypervisorError, err:
652     logging.exception("Failed to start instance")
653     return False
654
655   return True
656
657
658 def ShutdownInstance(instance):
659   """Shut an instance down.
660
661   Args:
662     instance - name of instance to shutdown.
663
664   """
665   running_instances = GetInstanceList()
666
667   if instance.name not in running_instances:
668     return True
669
670   hyper = hypervisor.GetHypervisor()
671   try:
672     hyper.StopInstance(instance)
673   except errors.HypervisorError, err:
674     logging.error("Failed to stop instance")
675     return False
676
677   # test every 10secs for 2min
678   shutdown_ok = False
679
680   time.sleep(1)
681   for dummy in range(11):
682     if instance.name not in GetInstanceList():
683       break
684     time.sleep(10)
685   else:
686     # the shutdown did not succeed
687     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
688
689     try:
690       hyper.StopInstance(instance, force=True)
691     except errors.HypervisorError, err:
692       logging.exception("Failed to stop instance")
693       return False
694
695     time.sleep(1)
696     if instance.name in GetInstanceList():
697       logging.error("could not shutdown instance '%s' even by destroy",
698                     instance.name)
699       return False
700
701   return True
702
703
704 def RebootInstance(instance, reboot_type, extra_args):
705   """Reboot an instance.
706
707   Args:
708     instance    - name of instance to reboot
709     reboot_type - how to reboot [soft,hard,full]
710
711   """
712   running_instances = GetInstanceList()
713
714   if instance.name not in running_instances:
715     logging.error("Cannot reboot instance that is not running")
716     return False
717
718   hyper = hypervisor.GetHypervisor()
719   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
720     try:
721       hyper.RebootInstance(instance)
722     except errors.HypervisorError, err:
723       logging.exception("Failed to soft reboot instance")
724       return False
725   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
726     try:
727       ShutdownInstance(instance)
728       StartInstance(instance, extra_args)
729     except errors.HypervisorError, err:
730       logging.exception("Failed to hard reboot instance")
731       return False
732   else:
733     raise errors.ParameterError("reboot_type invalid")
734
735
736   return True
737
738
739 def MigrateInstance(instance, target, live):
740   """Migrates an instance to another node.
741
742   """
743   hyper = hypervisor.GetHypervisor()
744
745   try:
746     hyper.MigrateInstance(instance, target, live)
747   except errors.HypervisorError, err:
748     msg = "Failed to migrate instance: %s" % str(err)
749     logging.error(msg)
750     return (False, msg)
751   return (True, "Migration successfull")
752
753
754 def CreateBlockDevice(disk, size, owner, on_primary, info):
755   """Creates a block device for an instance.
756
757   Args:
758    disk: a ganeti.objects.Disk object
759    size: the size of the physical underlying device
760    owner: a string with the name of the instance
761    on_primary: a boolean indicating if it is the primary node or not
762    info: string that will be sent to the physical device creation
763
764   Returns:
765     the new unique_id of the device (this can sometime be
766     computed only after creation), or None. On secondary nodes,
767     it's not required to return anything.
768
769   """
770   clist = []
771   if disk.children:
772     for child in disk.children:
773       crdev = _RecursiveAssembleBD(child, owner, on_primary)
774       if on_primary or disk.AssembleOnSecondary():
775         # we need the children open in case the device itself has to
776         # be assembled
777         crdev.Open()
778       clist.append(crdev)
779   try:
780     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
781     if device is not None:
782       logging.info("removing existing device %s", disk)
783       device.Remove()
784   except errors.BlockDeviceError, err:
785     pass
786
787   device = bdev.Create(disk.dev_type, disk.physical_id,
788                        clist, size)
789   if device is None:
790     raise ValueError("Can't create child device for %s, %s" %
791                      (disk, size))
792   if on_primary or disk.AssembleOnSecondary():
793     if not device.Assemble():
794       errorstring = "Can't assemble device after creation"
795       logging.error(errorstring)
796       raise errors.BlockDeviceError("%s, very unusual event - check the node"
797                                     " daemon logs" % errorstring)
798     device.SetSyncSpeed(constants.SYNC_SPEED)
799     if on_primary or disk.OpenOnSecondary():
800       device.Open(force=True)
801     DevCacheManager.UpdateCache(device.dev_path, owner,
802                                 on_primary, disk.iv_name)
803
804   device.SetInfo(info)
805
806   physical_id = device.unique_id
807   return physical_id
808
809
810 def RemoveBlockDevice(disk):
811   """Remove a block device.
812
813   This is intended to be called recursively.
814
815   """
816   try:
817     # since we are removing the device, allow a partial match
818     # this allows removal of broken mirrors
819     rdev = _RecursiveFindBD(disk, allow_partial=True)
820   except errors.BlockDeviceError, err:
821     # probably can't attach
822     logging.info("Can't attach to device %s in remove", disk)
823     rdev = None
824   if rdev is not None:
825     r_path = rdev.dev_path
826     result = rdev.Remove()
827     if result:
828       DevCacheManager.RemoveCache(r_path)
829   else:
830     result = True
831   if disk.children:
832     for child in disk.children:
833       result = result and RemoveBlockDevice(child)
834   return result
835
836
837 def _RecursiveAssembleBD(disk, owner, as_primary):
838   """Activate a block device for an instance.
839
840   This is run on the primary and secondary nodes for an instance.
841
842   This function is called recursively.
843
844   Args:
845     disk: a objects.Disk object
846     as_primary: if we should make the block device read/write
847
848   Returns:
849     the assembled device or None (in case no device was assembled)
850
851   If the assembly is not successful, an exception is raised.
852
853   """
854   children = []
855   if disk.children:
856     mcn = disk.ChildrenNeeded()
857     if mcn == -1:
858       mcn = 0 # max number of Nones allowed
859     else:
860       mcn = len(disk.children) - mcn # max number of Nones
861     for chld_disk in disk.children:
862       try:
863         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
864       except errors.BlockDeviceError, err:
865         if children.count(None) >= mcn:
866           raise
867         cdev = None
868         logging.debug("Error in child activation: %s", str(err))
869       children.append(cdev)
870
871   if as_primary or disk.AssembleOnSecondary():
872     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
873     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
874     result = r_dev
875     if as_primary or disk.OpenOnSecondary():
876       r_dev.Open()
877     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
878                                 as_primary, disk.iv_name)
879
880   else:
881     result = True
882   return result
883
884
885 def AssembleBlockDevice(disk, owner, as_primary):
886   """Activate a block device for an instance.
887
888   This is a wrapper over _RecursiveAssembleBD.
889
890   Returns:
891     a /dev path for primary nodes
892     True for secondary nodes
893
894   """
895   result = _RecursiveAssembleBD(disk, owner, as_primary)
896   if isinstance(result, bdev.BlockDev):
897     result = result.dev_path
898   return result
899
900
901 def ShutdownBlockDevice(disk):
902   """Shut down a block device.
903
904   First, if the device is assembled (can `Attach()`), then the device
905   is shutdown. Then the children of the device are shutdown.
906
907   This function is called recursively. Note that we don't cache the
908   children or such, as oppossed to assemble, shutdown of different
909   devices doesn't require that the upper device was active.
910
911   """
912   r_dev = _RecursiveFindBD(disk)
913   if r_dev is not None:
914     r_path = r_dev.dev_path
915     result = r_dev.Shutdown()
916     if result:
917       DevCacheManager.RemoveCache(r_path)
918   else:
919     result = True
920   if disk.children:
921     for child in disk.children:
922       result = result and ShutdownBlockDevice(child)
923   return result
924
925
926 def MirrorAddChildren(parent_cdev, new_cdevs):
927   """Extend a mirrored block device.
928
929   """
930   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
931   if parent_bdev is None:
932     logging.error("Can't find parent device")
933     return False
934   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
935   if new_bdevs.count(None) > 0:
936     logging.error("Can't find new device(s) to add: %s:%s",
937                   new_bdevs, new_cdevs)
938     return False
939   parent_bdev.AddChildren(new_bdevs)
940   return True
941
942
943 def MirrorRemoveChildren(parent_cdev, new_cdevs):
944   """Shrink a mirrored block device.
945
946   """
947   parent_bdev = _RecursiveFindBD(parent_cdev)
948   if parent_bdev is None:
949     logging.error("Can't find parent in remove children: %s", parent_cdev)
950     return False
951   devs = []
952   for disk in new_cdevs:
953     rpath = disk.StaticDevPath()
954     if rpath is None:
955       bd = _RecursiveFindBD(disk)
956       if bd is None:
957         logging.error("Can't find dynamic device %s while removing children",
958                       disk)
959         return False
960       else:
961         devs.append(bd.dev_path)
962     else:
963       devs.append(rpath)
964   parent_bdev.RemoveChildren(devs)
965   return True
966
967
968 def GetMirrorStatus(disks):
969   """Get the mirroring status of a list of devices.
970
971   Args:
972     disks: list of `objects.Disk`
973
974   Returns:
975     list of (mirror_done, estimated_time) tuples, which
976     are the result of bdev.BlockDevice.CombinedSyncStatus()
977
978   """
979   stats = []
980   for dsk in disks:
981     rbd = _RecursiveFindBD(dsk)
982     if rbd is None:
983       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
984     stats.append(rbd.CombinedSyncStatus())
985   return stats
986
987
988 def _RecursiveFindBD(disk, allow_partial=False):
989   """Check if a device is activated.
990
991   If so, return informations about the real device.
992
993   Args:
994     disk: the objects.Disk instance
995     allow_partial: don't abort the find if a child of the
996                    device can't be found; this is intended to be
997                    used when repairing mirrors
998
999   Returns:
1000     None if the device can't be found
1001     otherwise the device instance
1002
1003   """
1004   children = []
1005   if disk.children:
1006     for chdisk in disk.children:
1007       children.append(_RecursiveFindBD(chdisk))
1008
1009   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1010
1011
1012 def FindBlockDevice(disk):
1013   """Check if a device is activated.
1014
1015   If so, return informations about the real device.
1016
1017   Args:
1018     disk: the objects.Disk instance
1019   Returns:
1020     None if the device can't be found
1021     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1022
1023   """
1024   rbd = _RecursiveFindBD(disk)
1025   if rbd is None:
1026     return rbd
1027   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1028
1029 def _IsJobQueueFile(file_name):
1030   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1031   return os.path.commonprefix([queue_dir, file_name]) == queue_dir
1032
1033 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1034   """Write a file to the filesystem.
1035
1036   This allows the master to overwrite(!) a file. It will only perform
1037   the operation if the file belongs to a list of configuration files.
1038
1039   """
1040   if not os.path.isabs(file_name):
1041     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1042                   file_name)
1043     return False
1044
1045   allowed_files = [
1046     constants.CLUSTER_CONF_FILE,
1047     constants.ETC_HOSTS,
1048     constants.SSH_KNOWN_HOSTS_FILE,
1049     constants.VNC_PASSWORD_FILE,
1050     ]
1051   allowed_files.extend(ssconf.SimpleStore().GetFileList())
1052
1053   if not (file_name in allowed_files or _IsJobQueueFile(file_name)):
1054     logging.error("Filename passed to UploadFile not in allowed"
1055                  " upload targets: '%s'", file_name)
1056     return False
1057
1058   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1059                   atime=atime, mtime=mtime)
1060   return True
1061
1062
1063 def _ErrnoOrStr(err):
1064   """Format an EnvironmentError exception.
1065
1066   If the `err` argument has an errno attribute, it will be looked up
1067   and converted into a textual EXXXX description. Otherwise the string
1068   representation of the error will be returned.
1069
1070   """
1071   if hasattr(err, 'errno'):
1072     detail = errno.errorcode[err.errno]
1073   else:
1074     detail = str(err)
1075   return detail
1076
1077
1078 def _OSOndiskVersion(name, os_dir):
1079   """Compute and return the API version of a given OS.
1080
1081   This function will try to read the API version of the os given by
1082   the 'name' parameter and residing in the 'os_dir' directory.
1083
1084   Return value will be either an integer denoting the version or None in the
1085   case when this is not a valid OS name.
1086
1087   """
1088   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1089
1090   try:
1091     st = os.stat(api_file)
1092   except EnvironmentError, err:
1093     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1094                            " found (%s)" % _ErrnoOrStr(err))
1095
1096   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1097     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1098                            " a regular file")
1099
1100   try:
1101     f = open(api_file)
1102     try:
1103       api_version = f.read(256)
1104     finally:
1105       f.close()
1106   except EnvironmentError, err:
1107     raise errors.InvalidOS(name, os_dir, "error while reading the"
1108                            " API version (%s)" % _ErrnoOrStr(err))
1109
1110   api_version = api_version.strip()
1111   try:
1112     api_version = int(api_version)
1113   except (TypeError, ValueError), err:
1114     raise errors.InvalidOS(name, os_dir,
1115                            "API version is not integer (%s)" % str(err))
1116
1117   return api_version
1118
1119
1120 def DiagnoseOS(top_dirs=None):
1121   """Compute the validity for all OSes.
1122
1123   Returns an OS object for each name in all the given top directories
1124   (if not given defaults to constants.OS_SEARCH_PATH)
1125
1126   Returns:
1127     list of OS objects
1128
1129   """
1130   if top_dirs is None:
1131     top_dirs = constants.OS_SEARCH_PATH
1132
1133   result = []
1134   for dir_name in top_dirs:
1135     if os.path.isdir(dir_name):
1136       try:
1137         f_names = utils.ListVisibleFiles(dir_name)
1138       except EnvironmentError, err:
1139         logging.exception("Can't list the OS directory %s", dir_name)
1140         break
1141       for name in f_names:
1142         try:
1143           os_inst = OSFromDisk(name, base_dir=dir_name)
1144           result.append(os_inst)
1145         except errors.InvalidOS, err:
1146           result.append(objects.OS.FromInvalidOS(err))
1147
1148   return result
1149
1150
1151 def OSFromDisk(name, base_dir=None):
1152   """Create an OS instance from disk.
1153
1154   This function will return an OS instance if the given name is a
1155   valid OS name. Otherwise, it will raise an appropriate
1156   `errors.InvalidOS` exception, detailing why this is not a valid
1157   OS.
1158
1159   Args:
1160     os_dir: Directory containing the OS scripts. Defaults to a search
1161             in all the OS_SEARCH_PATH directories.
1162
1163   """
1164
1165   if base_dir is None:
1166     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1167     if os_dir is None:
1168       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1169   else:
1170     os_dir = os.path.sep.join([base_dir, name])
1171
1172   api_version = _OSOndiskVersion(name, os_dir)
1173
1174   if api_version != constants.OS_API_VERSION:
1175     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1176                            " (found %s want %s)"
1177                            % (api_version, constants.OS_API_VERSION))
1178
1179   # OS Scripts dictionary, we will populate it with the actual script names
1180   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1181
1182   for script in os_scripts:
1183     os_scripts[script] = os.path.sep.join([os_dir, script])
1184
1185     try:
1186       st = os.stat(os_scripts[script])
1187     except EnvironmentError, err:
1188       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1189                              (script, _ErrnoOrStr(err)))
1190
1191     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1192       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1193                              script)
1194
1195     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1196       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1197                              script)
1198
1199
1200   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1201                     create_script=os_scripts['create'],
1202                     export_script=os_scripts['export'],
1203                     import_script=os_scripts['import'],
1204                     rename_script=os_scripts['rename'],
1205                     api_version=api_version)
1206
1207
1208 def GrowBlockDevice(disk, amount):
1209   """Grow a stack of block devices.
1210
1211   This function is called recursively, with the childrens being the
1212   first one resize.
1213
1214   Args:
1215     disk: the disk to be grown
1216
1217   Returns: a tuple of (status, result), with:
1218     status: the result (true/false) of the operation
1219     result: the error message if the operation failed, otherwise not used
1220
1221   """
1222   r_dev = _RecursiveFindBD(disk)
1223   if r_dev is None:
1224     return False, "Cannot find block device %s" % (disk,)
1225
1226   try:
1227     r_dev.Grow(amount)
1228   except errors.BlockDeviceError, err:
1229     return False, str(err)
1230
1231   return True, None
1232
1233
1234 def SnapshotBlockDevice(disk):
1235   """Create a snapshot copy of a block device.
1236
1237   This function is called recursively, and the snapshot is actually created
1238   just for the leaf lvm backend device.
1239
1240   Args:
1241     disk: the disk to be snapshotted
1242
1243   Returns:
1244     a config entry for the actual lvm device snapshotted.
1245
1246   """
1247   if disk.children:
1248     if len(disk.children) == 1:
1249       # only one child, let's recurse on it
1250       return SnapshotBlockDevice(disk.children[0])
1251     else:
1252       # more than one child, choose one that matches
1253       for child in disk.children:
1254         if child.size == disk.size:
1255           # return implies breaking the loop
1256           return SnapshotBlockDevice(child)
1257   elif disk.dev_type == constants.LD_LV:
1258     r_dev = _RecursiveFindBD(disk)
1259     if r_dev is not None:
1260       # let's stay on the safe side and ask for the full size, for now
1261       return r_dev.Snapshot(disk.size)
1262     else:
1263       return None
1264   else:
1265     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1266                                  " '%s' of type '%s'" %
1267                                  (disk.unique_id, disk.dev_type))
1268
1269
1270 def ExportSnapshot(disk, dest_node, instance):
1271   """Export a block device snapshot to a remote node.
1272
1273   Args:
1274     disk: the snapshot block device
1275     dest_node: the node to send the image to
1276     instance: instance being exported
1277
1278   Returns:
1279     True if successful, False otherwise.
1280
1281   """
1282   inst_os = OSFromDisk(instance.os)
1283   export_script = inst_os.export_script
1284
1285   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1286                                      instance.name, int(time.time()))
1287   if not os.path.exists(constants.LOG_OS_DIR):
1288     os.mkdir(constants.LOG_OS_DIR, 0750)
1289
1290   real_os_dev = _RecursiveFindBD(disk)
1291   if real_os_dev is None:
1292     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1293                                   str(disk))
1294   real_os_dev.Open()
1295
1296   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1297   destfile = disk.physical_id[1]
1298
1299   # the target command is built out of three individual commands,
1300   # which are joined by pipes; we check each individual command for
1301   # valid parameters
1302
1303   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1304                                export_script, instance.name,
1305                                real_os_dev.dev_path, logfile)
1306
1307   comprcmd = "gzip"
1308
1309   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1310                                 destdir, destdir, destfile)
1311   remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1312                                        destcmd)
1313
1314   # all commands have been checked, so we're safe to combine them
1315   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1316
1317   result = utils.RunCmd(command)
1318
1319   if result.failed:
1320     logging.error("os snapshot export command '%s' returned error: %s"
1321                   " output: %s", command, result.fail_reason, result.output)
1322     return False
1323
1324   return True
1325
1326
1327 def FinalizeExport(instance, snap_disks):
1328   """Write out the export configuration information.
1329
1330   Args:
1331     instance: instance configuration
1332     snap_disks: snapshot block devices
1333
1334   Returns:
1335     False in case of error, True otherwise.
1336
1337   """
1338   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1339   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1340
1341   config = objects.SerializableConfigParser()
1342
1343   config.add_section(constants.INISECT_EXP)
1344   config.set(constants.INISECT_EXP, 'version', '0')
1345   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1346   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1347   config.set(constants.INISECT_EXP, 'os', instance.os)
1348   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1349
1350   config.add_section(constants.INISECT_INS)
1351   config.set(constants.INISECT_INS, 'name', instance.name)
1352   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1353   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1354   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1355
1356   nic_count = 0
1357   for nic_count, nic in enumerate(instance.nics):
1358     config.set(constants.INISECT_INS, 'nic%d_mac' %
1359                nic_count, '%s' % nic.mac)
1360     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1361     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1362                '%s' % nic.bridge)
1363   # TODO: redundant: on load can read nics until it doesn't exist
1364   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1365
1366   disk_count = 0
1367   for disk_count, disk in enumerate(snap_disks):
1368     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1369                ('%s' % disk.iv_name))
1370     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1371                ('%s' % disk.physical_id[1]))
1372     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1373                ('%d' % disk.size))
1374   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1375
1376   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1377   cfo = open(cff, 'w')
1378   try:
1379     config.write(cfo)
1380   finally:
1381     cfo.close()
1382
1383   shutil.rmtree(finaldestdir, True)
1384   shutil.move(destdir, finaldestdir)
1385
1386   return True
1387
1388
1389 def ExportInfo(dest):
1390   """Get export configuration information.
1391
1392   Args:
1393     dest: directory containing the export
1394
1395   Returns:
1396     A serializable config file containing the export info.
1397
1398   """
1399   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1400
1401   config = objects.SerializableConfigParser()
1402   config.read(cff)
1403
1404   if (not config.has_section(constants.INISECT_EXP) or
1405       not config.has_section(constants.INISECT_INS)):
1406     return None
1407
1408   return config
1409
1410
1411 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1412   """Import an os image into an instance.
1413
1414   Args:
1415     instance: the instance object
1416     os_disk: the instance-visible name of the os device
1417     swap_disk: the instance-visible name of the swap device
1418     src_node: node holding the source image
1419     src_image: path to the source image on src_node
1420
1421   Returns:
1422     False in case of error, True otherwise.
1423
1424   """
1425   inst_os = OSFromDisk(instance.os)
1426   import_script = inst_os.import_script
1427
1428   os_device = instance.FindDisk(os_disk)
1429   if os_device is None:
1430     logging.error("Can't find this device-visible name '%s'", os_disk)
1431     return False
1432
1433   swap_device = instance.FindDisk(swap_disk)
1434   if swap_device is None:
1435     logging.error("Can't find this device-visible name '%s'", swap_disk)
1436     return False
1437
1438   real_os_dev = _RecursiveFindBD(os_device)
1439   if real_os_dev is None:
1440     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1441                                   str(os_device))
1442   real_os_dev.Open()
1443
1444   real_swap_dev = _RecursiveFindBD(swap_device)
1445   if real_swap_dev is None:
1446     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1447                                   str(swap_device))
1448   real_swap_dev.Open()
1449
1450   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1451                                         instance.name, int(time.time()))
1452   if not os.path.exists(constants.LOG_OS_DIR):
1453     os.mkdir(constants.LOG_OS_DIR, 0750)
1454
1455   destcmd = utils.BuildShellCmd('cat %s', src_image)
1456   remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1457                                        destcmd)
1458
1459   comprcmd = "gunzip"
1460   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1461                                inst_os.path, import_script, instance.name,
1462                                real_os_dev.dev_path, real_swap_dev.dev_path,
1463                                logfile)
1464
1465   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1466
1467   result = utils.RunCmd(command)
1468
1469   if result.failed:
1470     logging.error("os import command '%s' returned error: %s"
1471                   " output: %s", command, result.fail_reason, result.output)
1472     return False
1473
1474   return True
1475
1476
1477 def ListExports():
1478   """Return a list of exports currently available on this machine.
1479
1480   """
1481   if os.path.isdir(constants.EXPORT_DIR):
1482     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1483   else:
1484     return []
1485
1486
1487 def RemoveExport(export):
1488   """Remove an existing export from the node.
1489
1490   Args:
1491     export: the name of the export to remove
1492
1493   Returns:
1494     False in case of error, True otherwise.
1495
1496   """
1497   target = os.path.join(constants.EXPORT_DIR, export)
1498
1499   shutil.rmtree(target)
1500   # TODO: catch some of the relevant exceptions and provide a pretty
1501   # error message if rmtree fails.
1502
1503   return True
1504
1505
1506 def RenameBlockDevices(devlist):
1507   """Rename a list of block devices.
1508
1509   The devlist argument is a list of tuples (disk, new_logical,
1510   new_physical). The return value will be a combined boolean result
1511   (True only if all renames succeeded).
1512
1513   """
1514   result = True
1515   for disk, unique_id in devlist:
1516     dev = _RecursiveFindBD(disk)
1517     if dev is None:
1518       result = False
1519       continue
1520     try:
1521       old_rpath = dev.dev_path
1522       dev.Rename(unique_id)
1523       new_rpath = dev.dev_path
1524       if old_rpath != new_rpath:
1525         DevCacheManager.RemoveCache(old_rpath)
1526         # FIXME: we should add the new cache information here, like:
1527         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1528         # but we don't have the owner here - maybe parse from existing
1529         # cache? for now, we only lose lvm data when we rename, which
1530         # is less critical than DRBD or MD
1531     except errors.BlockDeviceError, err:
1532       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1533       result = False
1534   return result
1535
1536
1537 def _TransformFileStorageDir(file_storage_dir):
1538   """Checks whether given file_storage_dir is valid.
1539
1540   Checks wheter the given file_storage_dir is within the cluster-wide
1541   default file_storage_dir stored in SimpleStore. Only paths under that
1542   directory are allowed.
1543
1544   Args:
1545     file_storage_dir: string with path
1546
1547   Returns:
1548     normalized file_storage_dir (string) if valid, None otherwise
1549
1550   """
1551   file_storage_dir = os.path.normpath(file_storage_dir)
1552   base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1553   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1554       base_file_storage_dir):
1555     logging.error("file storage directory '%s' is not under base file"
1556                   " storage directory '%s'",
1557                   file_storage_dir, base_file_storage_dir)
1558     return None
1559   return file_storage_dir
1560
1561
1562 def CreateFileStorageDir(file_storage_dir):
1563   """Create file storage directory.
1564
1565   Args:
1566     file_storage_dir: string containing the path
1567
1568   Returns:
1569     tuple with first element a boolean indicating wheter dir
1570     creation was successful or not
1571
1572   """
1573   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1574   result = True,
1575   if not file_storage_dir:
1576     result = False,
1577   else:
1578     if os.path.exists(file_storage_dir):
1579       if not os.path.isdir(file_storage_dir):
1580         logging.error("'%s' is not a directory", file_storage_dir)
1581         result = False,
1582     else:
1583       try:
1584         os.makedirs(file_storage_dir, 0750)
1585       except OSError, err:
1586         logging.error("Cannot create file storage directory '%s': %s",
1587                       file_storage_dir, err)
1588         result = False,
1589   return result
1590
1591
1592 def RemoveFileStorageDir(file_storage_dir):
1593   """Remove file storage directory.
1594
1595   Remove it only if it's empty. If not log an error and return.
1596
1597   Args:
1598     file_storage_dir: string containing the path
1599
1600   Returns:
1601     tuple with first element a boolean indicating wheter dir
1602     removal was successful or not
1603
1604   """
1605   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1606   result = True,
1607   if not file_storage_dir:
1608     result = False,
1609   else:
1610     if os.path.exists(file_storage_dir):
1611       if not os.path.isdir(file_storage_dir):
1612         logging.error("'%s' is not a directory", file_storage_dir)
1613         result = False,
1614       # deletes dir only if empty, otherwise we want to return False
1615       try:
1616         os.rmdir(file_storage_dir)
1617       except OSError, err:
1618         logging.exception("Cannot remove file storage directory '%s'",
1619                           file_storage_dir)
1620         result = False,
1621   return result
1622
1623
1624 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1625   """Rename the file storage directory.
1626
1627   Args:
1628     old_file_storage_dir: string containing the old path
1629     new_file_storage_dir: string containing the new path
1630
1631   Returns:
1632     tuple with first element a boolean indicating wheter dir
1633     rename was successful or not
1634
1635   """
1636   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1637   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1638   result = True,
1639   if not old_file_storage_dir or not new_file_storage_dir:
1640     result = False,
1641   else:
1642     if not os.path.exists(new_file_storage_dir):
1643       if os.path.isdir(old_file_storage_dir):
1644         try:
1645           os.rename(old_file_storage_dir, new_file_storage_dir)
1646         except OSError, err:
1647           logging.exception("Cannot rename '%s' to '%s'",
1648                             old_file_storage_dir, new_file_storage_dir)
1649           result =  False,
1650       else:
1651         logging.error("'%s' is not a directory", old_file_storage_dir)
1652         result = False,
1653     else:
1654       if os.path.exists(old_file_storage_dir):
1655         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1656                       old_file_storage_dir, new_file_storage_dir)
1657         result = False,
1658   return result
1659
1660
1661 def JobQueuePurge():
1662   """Removes job queue files and archived jobs
1663
1664   """
1665   _CleanDirectory(constants.QUEUE_DIR)
1666   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
1667
1668
1669 def CloseBlockDevices(disks):
1670   """Closes the given block devices.
1671
1672   This means they will be switched to secondary mode (in case of DRBD).
1673
1674   """
1675   bdevs = []
1676   for cf in disks:
1677     rd = _RecursiveFindBD(cf)
1678     if rd is None:
1679       return (False, "Can't find device %s" % cf)
1680     bdevs.append(rd)
1681
1682   msg = []
1683   for rd in bdevs:
1684     try:
1685       rd.Close()
1686     except errors.BlockDeviceError, err:
1687       msg.append(str(err))
1688   if msg:
1689     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1690   else:
1691     return (True, "All devices secondary")
1692
1693
1694 class HooksRunner(object):
1695   """Hook runner.
1696
1697   This class is instantiated on the node side (ganeti-noded) and not on
1698   the master side.
1699
1700   """
1701   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1702
1703   def __init__(self, hooks_base_dir=None):
1704     """Constructor for hooks runner.
1705
1706     Args:
1707       - hooks_base_dir: if not None, this overrides the
1708         constants.HOOKS_BASE_DIR (useful for unittests)
1709
1710     """
1711     if hooks_base_dir is None:
1712       hooks_base_dir = constants.HOOKS_BASE_DIR
1713     self._BASE_DIR = hooks_base_dir
1714
1715   @staticmethod
1716   def ExecHook(script, env):
1717     """Exec one hook script.
1718
1719     Args:
1720      - script: the full path to the script
1721      - env: the environment with which to exec the script
1722
1723     """
1724     # exec the process using subprocess and log the output
1725     fdstdin = None
1726     try:
1727       fdstdin = open("/dev/null", "r")
1728       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1729                                stderr=subprocess.STDOUT, close_fds=True,
1730                                shell=False, cwd="/", env=env)
1731       output = ""
1732       try:
1733         output = child.stdout.read(4096)
1734         child.stdout.close()
1735       except EnvironmentError, err:
1736         output += "Hook script error: %s" % str(err)
1737
1738       while True:
1739         try:
1740           result = child.wait()
1741           break
1742         except EnvironmentError, err:
1743           if err.errno == errno.EINTR:
1744             continue
1745           raise
1746     finally:
1747       # try not to leak fds
1748       for fd in (fdstdin, ):
1749         if fd is not None:
1750           try:
1751             fd.close()
1752           except EnvironmentError, err:
1753             # just log the error
1754             #logging.exception("Error while closing fd %s", fd)
1755             pass
1756
1757     return result == 0, output
1758
1759   def RunHooks(self, hpath, phase, env):
1760     """Run the scripts in the hooks directory.
1761
1762     This method will not be usually overriden by child opcodes.
1763
1764     """
1765     if phase == constants.HOOKS_PHASE_PRE:
1766       suffix = "pre"
1767     elif phase == constants.HOOKS_PHASE_POST:
1768       suffix = "post"
1769     else:
1770       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1771     rr = []
1772
1773     subdir = "%s-%s.d" % (hpath, suffix)
1774     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1775     try:
1776       dir_contents = utils.ListVisibleFiles(dir_name)
1777     except OSError, err:
1778       # must log
1779       return rr
1780
1781     # we use the standard python sort order,
1782     # so 00name is the recommended naming scheme
1783     dir_contents.sort()
1784     for relname in dir_contents:
1785       fname = os.path.join(dir_name, relname)
1786       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1787           self.RE_MASK.match(relname) is not None):
1788         rrval = constants.HKR_SKIP
1789         output = ""
1790       else:
1791         result, output = self.ExecHook(fname, env)
1792         if not result:
1793           rrval = constants.HKR_FAIL
1794         else:
1795           rrval = constants.HKR_SUCCESS
1796       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1797
1798     return rr
1799
1800
1801 class IAllocatorRunner(object):
1802   """IAllocator runner.
1803
1804   This class is instantiated on the node side (ganeti-noded) and not on
1805   the master side.
1806
1807   """
1808   def Run(self, name, idata):
1809     """Run an iallocator script.
1810
1811     Return value: tuple of:
1812        - run status (one of the IARUN_ constants)
1813        - stdout
1814        - stderr
1815        - fail reason (as from utils.RunResult)
1816
1817     """
1818     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1819                                   os.path.isfile)
1820     if alloc_script is None:
1821       return (constants.IARUN_NOTFOUND, None, None, None)
1822
1823     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1824     try:
1825       os.write(fd, idata)
1826       os.close(fd)
1827       result = utils.RunCmd([alloc_script, fin_name])
1828       if result.failed:
1829         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1830                 result.fail_reason)
1831     finally:
1832       os.unlink(fin_name)
1833
1834     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1835
1836
1837 class DevCacheManager(object):
1838   """Simple class for managing a cache of block device information.
1839
1840   """
1841   _DEV_PREFIX = "/dev/"
1842   _ROOT_DIR = constants.BDEV_CACHE_DIR
1843
1844   @classmethod
1845   def _ConvertPath(cls, dev_path):
1846     """Converts a /dev/name path to the cache file name.
1847
1848     This replaces slashes with underscores and strips the /dev
1849     prefix. It then returns the full path to the cache file
1850
1851     """
1852     if dev_path.startswith(cls._DEV_PREFIX):
1853       dev_path = dev_path[len(cls._DEV_PREFIX):]
1854     dev_path = dev_path.replace("/", "_")
1855     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1856     return fpath
1857
1858   @classmethod
1859   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1860     """Updates the cache information for a given device.
1861
1862     """
1863     if dev_path is None:
1864       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1865       return
1866     fpath = cls._ConvertPath(dev_path)
1867     if on_primary:
1868       state = "primary"
1869     else:
1870       state = "secondary"
1871     if iv_name is None:
1872       iv_name = "not_visible"
1873     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1874     try:
1875       utils.WriteFile(fpath, data=fdata)
1876     except EnvironmentError, err:
1877       logging.exception("Can't update bdev cache for %s", dev_path)
1878
1879   @classmethod
1880   def RemoveCache(cls, dev_path):
1881     """Remove data for a dev_path.
1882
1883     """
1884     if dev_path is None:
1885       logging.error("DevCacheManager.RemoveCache got a None dev_path")
1886       return
1887     fpath = cls._ConvertPath(dev_path)
1888     try:
1889       utils.RemoveFile(fpath)
1890     except EnvironmentError, err:
1891       logging.exception("Can't update bdev cache for %s", dev_path)