Convert backend.py
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner():
52   return ssh.SshRunner()
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
121                      source=constants.LOCALHOST_IP_ADDRESS):
122       # we already have the ip:
123       logging.debug("Already started")
124     else:
125       logging.error("Someone else has the master ip, not activating")
126       ok = False
127   else:
128     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
129                            "dev", master_netdev, "label",
130                            "%s:0" % master_netdev])
131     if result.failed:
132       logging.error("Can't activate master IP: %s", result.output)
133       ok = False
134
135     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
136                            "-s", master_ip, master_ip])
137     # we'll ignore the exit code of arping
138
139   # and now start the master and rapi daemons
140   if start_daemons:
141     for daemon in 'ganeti-masterd', 'ganeti-rapi':
142       result = utils.RunCmd([daemon])
143       if result.failed:
144         logging.error("Can't start daemon %s: %s", daemon, result.output)
145         ok = False
146   return ok
147
148
149 def StopMaster(stop_daemons):
150   """Deactivate this node as master.
151
152   The function will always try to deactivate the IP address of the
153   master. Then, if the stop_daemons parameter is True, it will also
154   stop the master daemons (ganet-masterd and ganeti-rapi).
155
156   """
157   master_netdev, master_ip, _ = GetMasterInfo()
158   if not master_netdev:
159     return False
160
161   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
162                          "dev", master_netdev])
163   if result.failed:
164     logging.error("Can't remove the master IP, error: %s", result.output)
165     # but otherwise ignore the failure
166
167   if stop_daemons:
168     # stop/kill the rapi and the master daemon
169     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
170       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
171
172   return True
173
174
175 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
176   """Joins this node to the cluster.
177
178   This does the following:
179       - updates the hostkeys of the machine (rsa and dsa)
180       - adds the ssh private key to the user
181       - adds the ssh public key to the users' authorized_keys file
182
183   """
184   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
185                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
186                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
187                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
188   for name, content, mode in sshd_keys:
189     utils.WriteFile(name, data=content, mode=mode)
190
191   try:
192     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193                                                     mkdir=True)
194   except errors.OpExecError, err:
195     logging.exception("Error while processing user ssh files")
196     return False
197
198   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
199     utils.WriteFile(name, data=content, mode=0600)
200
201   utils.AddAuthorizedKey(auth_keys, sshpub)
202
203   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
204
205   return True
206
207
208 def LeaveCluster():
209   """Cleans up the current node and prepares it to be removed from the cluster.
210
211   """
212   _CleanDirectory(constants.DATA_DIR)
213   JobQueuePurge()
214
215   try:
216     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
217   except errors.OpExecError:
218     logging.exception("Error while processing ssh files")
219     return
220
221   f = open(pub_key, 'r')
222   try:
223     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
224   finally:
225     f.close()
226
227   utils.RemoveFile(priv_key)
228   utils.RemoveFile(pub_key)
229
230   # Return a reassuring string to the caller, and quit
231   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
232
233
234 def GetNodeInfo(vgname):
235   """Gives back a hash with different informations about the node.
236
237   Returns:
238     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
239       'memory_free' : xxx, 'memory_total' : xxx }
240     where
241     vg_size is the size of the configured volume group in MiB
242     vg_free is the free size of the volume group in MiB
243     memory_dom0 is the memory allocated for domain0 in MiB
244     memory_free is the currently available (free) ram in MiB
245     memory_total is the total number of ram in MiB
246
247   """
248   outputarray = {}
249   vginfo = _GetVGInfo(vgname)
250   outputarray['vg_size'] = vginfo['vg_size']
251   outputarray['vg_free'] = vginfo['vg_free']
252
253   hyper = hypervisor.GetHypervisor()
254   hyp_info = hyper.GetNodeInfo()
255   if hyp_info is not None:
256     outputarray.update(hyp_info)
257
258   f = open("/proc/sys/kernel/random/boot_id", 'r')
259   try:
260     outputarray["bootid"] = f.read(128).rstrip("\n")
261   finally:
262     f.close()
263
264   return outputarray
265
266
267 def VerifyNode(what):
268   """Verify the status of the local node.
269
270   Args:
271     what - a dictionary of things to check:
272       'filelist' : list of files for which to compute checksums
273       'nodelist' : list of nodes we should check communication with
274       'hypervisor': run the hypervisor-specific verify
275
276   Requested files on local node are checksummed and the result returned.
277
278   The nodelist is traversed, with the following checks being made
279   for each node:
280   - known_hosts key correct
281   - correct resolving of node name (target node returns its own hostname
282     by ssh-execution of 'hostname', result compared against name in list.
283
284   """
285   result = {}
286
287   if 'hypervisor' in what:
288     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
289
290   if 'filelist' in what:
291     result['filelist'] = utils.FingerprintFiles(what['filelist'])
292
293   if 'nodelist' in what:
294     result['nodelist'] = {}
295     random.shuffle(what['nodelist'])
296     for node in what['nodelist']:
297       success, message = _GetSshRunner().VerifyNodeHostname(node)
298       if not success:
299         result['nodelist'][node] = message
300   if 'node-net-test' in what:
301     result['node-net-test'] = {}
302     my_name = utils.HostInfo().name
303     my_pip = my_sip = None
304     for name, pip, sip in what['node-net-test']:
305       if name == my_name:
306         my_pip = pip
307         my_sip = sip
308         break
309     if not my_pip:
310       result['node-net-test'][my_name] = ("Can't find my own"
311                                           " primary/secondary IP"
312                                           " in the node list")
313     else:
314       port = utils.GetNodeDaemonPort()
315       for name, pip, sip in what['node-net-test']:
316         fail = []
317         if not utils.TcpPing(pip, port, source=my_pip):
318           fail.append("primary")
319         if sip != pip:
320           if not utils.TcpPing(sip, port, source=my_sip):
321             fail.append("secondary")
322         if fail:
323           result['node-net-test'][name] = ("failure using the %s"
324                                            " interface(s)" %
325                                            " and ".join(fail))
326
327   return result
328
329
330 def GetVolumeList(vg_name):
331   """Compute list of logical volumes and their size.
332
333   Returns:
334     dictionary of all partions (key) with their size (in MiB), inactive
335     and online status:
336     {'test1': ('20.06', True, True)}
337
338   """
339   lvs = {}
340   sep = '|'
341   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
342                          "--separator=%s" % sep,
343                          "-olv_name,lv_size,lv_attr", vg_name])
344   if result.failed:
345     logging.error("Failed to list logical volumes, lvs output: %s",
346                   result.output)
347     return result.output
348
349   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
350   for line in result.stdout.splitlines():
351     line = line.strip()
352     match = valid_line_re.match(line)
353     if not match:
354       logging.error("Invalid line returned from lvs output: '%s'", line)
355       continue
356     name, size, attr = match.groups()
357     inactive = attr[4] == '-'
358     online = attr[5] == 'o'
359     lvs[name] = (size, inactive, online)
360
361   return lvs
362
363
364 def ListVolumeGroups():
365   """List the volume groups and their size.
366
367   Returns:
368     Dictionary with keys volume name and values the size of the volume
369
370   """
371   return utils.ListVolumeGroups()
372
373
374 def NodeVolumes():
375   """List all volumes on this node.
376
377   """
378   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
379                          "--separator=|",
380                          "--options=lv_name,lv_size,devices,vg_name"])
381   if result.failed:
382     logging.error("Failed to list logical volumes, lvs output: %s",
383                   result.output)
384     return {}
385
386   def parse_dev(dev):
387     if '(' in dev:
388       return dev.split('(')[0]
389     else:
390       return dev
391
392   def map_line(line):
393     return {
394       'name': line[0].strip(),
395       'size': line[1].strip(),
396       'dev': parse_dev(line[2].strip()),
397       'vg': line[3].strip(),
398     }
399
400   return [map_line(line.split('|')) for line in result.stdout.splitlines()
401           if line.count('|') >= 3]
402
403
404 def BridgesExist(bridges_list):
405   """Check if a list of bridges exist on the current node.
406
407   Returns:
408     True if all of them exist, false otherwise
409
410   """
411   for bridge in bridges_list:
412     if not utils.BridgeExists(bridge):
413       return False
414
415   return True
416
417
418 def GetInstanceList():
419   """Provides a list of instances.
420
421   Returns:
422     A list of all running instances on the current node
423     - instance1.example.com
424     - instance2.example.com
425
426   """
427   try:
428     names = hypervisor.GetHypervisor().ListInstances()
429   except errors.HypervisorError, err:
430     logging.exception("Error enumerating instances")
431     raise
432
433   return names
434
435
436 def GetInstanceInfo(instance):
437   """Gives back the informations about an instance as a dictionary.
438
439   Args:
440     instance: name of the instance (ex. instance1.example.com)
441
442   Returns:
443     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
444     where
445     memory: memory size of instance (int)
446     state: xen state of instance (string)
447     time: cpu time of instance (float)
448
449   """
450   output = {}
451
452   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
453   if iinfo is not None:
454     output['memory'] = iinfo[2]
455     output['state'] = iinfo[4]
456     output['time'] = iinfo[5]
457
458   return output
459
460
461 def GetAllInstancesInfo():
462   """Gather data about all instances.
463
464   This is the equivalent of `GetInstanceInfo()`, except that it
465   computes data for all instances at once, thus being faster if one
466   needs data about more than one instance.
467
468   Returns: a dictionary of dictionaries, keys being the instance name,
469     and with values:
470     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
471     where
472     memory: memory size of instance (int)
473     state: xen state of instance (string)
474     time: cpu time of instance (float)
475     vcpus: the number of cpus
476
477   """
478   output = {}
479
480   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
481   if iinfo:
482     for name, inst_id, memory, vcpus, state, times in iinfo:
483       output[name] = {
484         'memory': memory,
485         'vcpus': vcpus,
486         'state': state,
487         'time': times,
488         }
489
490   return output
491
492
493 def AddOSToInstance(instance, os_disk, swap_disk):
494   """Add an OS to an instance.
495
496   Args:
497     instance: the instance object
498     os_disk: the instance-visible name of the os device
499     swap_disk: the instance-visible name of the swap device
500
501   """
502   cfg = _GetConfig()
503   inst_os = OSFromDisk(instance.os)
504
505   create_script = inst_os.create_script
506
507   os_device = instance.FindDisk(os_disk)
508   if os_device is None:
509     logging.error("Can't find this device-visible name '%s'", os_disk)
510     return False
511
512   swap_device = instance.FindDisk(swap_disk)
513   if swap_device is None:
514     logging.error("Can't find this device-visible name '%s'", swap_disk)
515     return False
516
517   real_os_dev = _RecursiveFindBD(os_device)
518   if real_os_dev is None:
519     raise errors.BlockDeviceError("Block device '%s' is not set up" %
520                                   str(os_device))
521   real_os_dev.Open()
522
523   real_swap_dev = _RecursiveFindBD(swap_device)
524   if real_swap_dev is None:
525     raise errors.BlockDeviceError("Block device '%s' is not set up" %
526                                   str(swap_device))
527   real_swap_dev.Open()
528
529   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
530                                      instance.name, int(time.time()))
531   if not os.path.exists(constants.LOG_OS_DIR):
532     os.mkdir(constants.LOG_OS_DIR, 0750)
533
534   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
535                                 inst_os.path, create_script, instance.name,
536                                 real_os_dev.dev_path, real_swap_dev.dev_path,
537                                 logfile)
538   env = {'HYPERVISOR': cfg.GetHypervisorType()}
539
540   result = utils.RunCmd(command, env=env)
541   if result.failed:
542     logging.error("os create command '%s' returned error: %s, logfile: %s,"
543                   " output: %s", command, result.fail_reason, logfile,
544                   result.output)
545     return False
546
547   return True
548
549
550 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
551   """Run the OS rename script for an instance.
552
553   Args:
554     instance: the instance object
555     old_name: the old name of the instance
556     os_disk: the instance-visible name of the os device
557     swap_disk: the instance-visible name of the swap device
558
559   """
560   inst_os = OSFromDisk(instance.os)
561
562   script = inst_os.rename_script
563
564   os_device = instance.FindDisk(os_disk)
565   if os_device is None:
566     logging.error("Can't find this device-visible name '%s'", os_disk)
567     return False
568
569   swap_device = instance.FindDisk(swap_disk)
570   if swap_device is None:
571     logging.error("Can't find this device-visible name '%s'", swap_disk)
572     return False
573
574   real_os_dev = _RecursiveFindBD(os_device)
575   if real_os_dev is None:
576     raise errors.BlockDeviceError("Block device '%s' is not set up" %
577                                   str(os_device))
578   real_os_dev.Open()
579
580   real_swap_dev = _RecursiveFindBD(swap_device)
581   if real_swap_dev is None:
582     raise errors.BlockDeviceError("Block device '%s' is not set up" %
583                                   str(swap_device))
584   real_swap_dev.Open()
585
586   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
587                                            old_name,
588                                            instance.name, int(time.time()))
589   if not os.path.exists(constants.LOG_OS_DIR):
590     os.mkdir(constants.LOG_OS_DIR, 0750)
591
592   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
593                                 inst_os.path, script, old_name, instance.name,
594                                 real_os_dev.dev_path, real_swap_dev.dev_path,
595                                 logfile)
596
597   result = utils.RunCmd(command)
598
599   if result.failed:
600     logging.error("os create command '%s' returned error: %s output: %s",
601                   command, result.fail_reason, result.output)
602     return False
603
604   return True
605
606
607 def _GetVGInfo(vg_name):
608   """Get informations about the volume group.
609
610   Args:
611     vg_name: the volume group
612
613   Returns:
614     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
615     where
616     vg_size is the total size of the volume group in MiB
617     vg_free is the free size of the volume group in MiB
618     pv_count are the number of physical disks in that vg
619
620   If an error occurs during gathering of data, we return the same dict
621   with keys all set to None.
622
623   """
624   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
625
626   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
627                          "--nosuffix", "--units=m", "--separator=:", vg_name])
628
629   if retval.failed:
630     logging.error("volume group %s not present", vg_name)
631     return retdic
632   valarr = retval.stdout.strip().rstrip(':').split(':')
633   if len(valarr) == 3:
634     try:
635       retdic = {
636         "vg_size": int(round(float(valarr[0]), 0)),
637         "vg_free": int(round(float(valarr[1]), 0)),
638         "pv_count": int(valarr[2]),
639         }
640     except ValueError, err:
641       logging.exception("Fail to parse vgs output")
642   else:
643     logging.error("vgs output has the wrong number of fields (expected"
644                   " three): %s", str(valarr))
645   return retdic
646
647
648 def _GatherBlockDevs(instance):
649   """Set up an instance's block device(s).
650
651   This is run on the primary node at instance startup. The block
652   devices must be already assembled.
653
654   """
655   block_devices = []
656   for disk in instance.disks:
657     device = _RecursiveFindBD(disk)
658     if device is None:
659       raise errors.BlockDeviceError("Block device '%s' is not set up." %
660                                     str(disk))
661     device.Open()
662     block_devices.append((disk, device))
663   return block_devices
664
665
666 def StartInstance(instance, extra_args):
667   """Start an instance.
668
669   Args:
670     instance - name of instance to start.
671
672   """
673   running_instances = GetInstanceList()
674
675   if instance.name in running_instances:
676     return True
677
678   block_devices = _GatherBlockDevs(instance)
679   hyper = hypervisor.GetHypervisor()
680
681   try:
682     hyper.StartInstance(instance, block_devices, extra_args)
683   except errors.HypervisorError, err:
684     logging.exception("Failed to start instance")
685     return False
686
687   return True
688
689
690 def ShutdownInstance(instance):
691   """Shut an instance down.
692
693   Args:
694     instance - name of instance to shutdown.
695
696   """
697   running_instances = GetInstanceList()
698
699   if instance.name not in running_instances:
700     return True
701
702   hyper = hypervisor.GetHypervisor()
703   try:
704     hyper.StopInstance(instance)
705   except errors.HypervisorError, err:
706     logging.error("Failed to stop instance")
707     return False
708
709   # test every 10secs for 2min
710   shutdown_ok = False
711
712   time.sleep(1)
713   for dummy in range(11):
714     if instance.name not in GetInstanceList():
715       break
716     time.sleep(10)
717   else:
718     # the shutdown did not succeed
719     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
720
721     try:
722       hyper.StopInstance(instance, force=True)
723     except errors.HypervisorError, err:
724       logging.exception("Failed to stop instance")
725       return False
726
727     time.sleep(1)
728     if instance.name in GetInstanceList():
729       logging.error("could not shutdown instance '%s' even by destroy",
730                     instance.name)
731       return False
732
733   return True
734
735
736 def RebootInstance(instance, reboot_type, extra_args):
737   """Reboot an instance.
738
739   Args:
740     instance    - name of instance to reboot
741     reboot_type - how to reboot [soft,hard,full]
742
743   """
744   running_instances = GetInstanceList()
745
746   if instance.name not in running_instances:
747     logging.error("Cannot reboot instance that is not running")
748     return False
749
750   hyper = hypervisor.GetHypervisor()
751   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
752     try:
753       hyper.RebootInstance(instance)
754     except errors.HypervisorError, err:
755       logging.exception("Failed to soft reboot instance")
756       return False
757   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
758     try:
759       ShutdownInstance(instance)
760       StartInstance(instance, extra_args)
761     except errors.HypervisorError, err:
762       logging.exception("Failed to hard reboot instance")
763       return False
764   else:
765     raise errors.ParameterError("reboot_type invalid")
766
767
768   return True
769
770
771 def MigrateInstance(instance, target, live):
772   """Migrates an instance to another node.
773
774   """
775   hyper = hypervisor.GetHypervisor()
776
777   try:
778     hyper.MigrateInstance(instance, target, live)
779   except errors.HypervisorError, err:
780     msg = "Failed to migrate instance: %s" % str(err)
781     logging.error(msg)
782     return (False, msg)
783   return (True, "Migration successfull")
784
785
786 def CreateBlockDevice(disk, size, owner, on_primary, info):
787   """Creates a block device for an instance.
788
789   Args:
790    disk: a ganeti.objects.Disk object
791    size: the size of the physical underlying device
792    owner: a string with the name of the instance
793    on_primary: a boolean indicating if it is the primary node or not
794    info: string that will be sent to the physical device creation
795
796   Returns:
797     the new unique_id of the device (this can sometime be
798     computed only after creation), or None. On secondary nodes,
799     it's not required to return anything.
800
801   """
802   clist = []
803   if disk.children:
804     for child in disk.children:
805       crdev = _RecursiveAssembleBD(child, owner, on_primary)
806       if on_primary or disk.AssembleOnSecondary():
807         # we need the children open in case the device itself has to
808         # be assembled
809         crdev.Open()
810       clist.append(crdev)
811   try:
812     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
813     if device is not None:
814       logging.info("removing existing device %s", disk)
815       device.Remove()
816   except errors.BlockDeviceError, err:
817     pass
818
819   device = bdev.Create(disk.dev_type, disk.physical_id,
820                        clist, size)
821   if device is None:
822     raise ValueError("Can't create child device for %s, %s" %
823                      (disk, size))
824   if on_primary or disk.AssembleOnSecondary():
825     if not device.Assemble():
826       errorstring = "Can't assemble device after creation"
827       logging.error(errorstring)
828       raise errors.BlockDeviceError("%s, very unusual event - check the node"
829                                     " daemon logs" % errorstring)
830     device.SetSyncSpeed(constants.SYNC_SPEED)
831     if on_primary or disk.OpenOnSecondary():
832       device.Open(force=True)
833     DevCacheManager.UpdateCache(device.dev_path, owner,
834                                 on_primary, disk.iv_name)
835
836   device.SetInfo(info)
837
838   physical_id = device.unique_id
839   return physical_id
840
841
842 def RemoveBlockDevice(disk):
843   """Remove a block device.
844
845   This is intended to be called recursively.
846
847   """
848   try:
849     # since we are removing the device, allow a partial match
850     # this allows removal of broken mirrors
851     rdev = _RecursiveFindBD(disk, allow_partial=True)
852   except errors.BlockDeviceError, err:
853     # probably can't attach
854     logging.info("Can't attach to device %s in remove", disk)
855     rdev = None
856   if rdev is not None:
857     r_path = rdev.dev_path
858     result = rdev.Remove()
859     if result:
860       DevCacheManager.RemoveCache(r_path)
861   else:
862     result = True
863   if disk.children:
864     for child in disk.children:
865       result = result and RemoveBlockDevice(child)
866   return result
867
868
869 def _RecursiveAssembleBD(disk, owner, as_primary):
870   """Activate a block device for an instance.
871
872   This is run on the primary and secondary nodes for an instance.
873
874   This function is called recursively.
875
876   Args:
877     disk: a objects.Disk object
878     as_primary: if we should make the block device read/write
879
880   Returns:
881     the assembled device or None (in case no device was assembled)
882
883   If the assembly is not successful, an exception is raised.
884
885   """
886   children = []
887   if disk.children:
888     mcn = disk.ChildrenNeeded()
889     if mcn == -1:
890       mcn = 0 # max number of Nones allowed
891     else:
892       mcn = len(disk.children) - mcn # max number of Nones
893     for chld_disk in disk.children:
894       try:
895         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
896       except errors.BlockDeviceError, err:
897         if children.count(None) >= mcn:
898           raise
899         cdev = None
900         logging.debug("Error in child activation: %s", str(err))
901       children.append(cdev)
902
903   if as_primary or disk.AssembleOnSecondary():
904     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
905     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
906     result = r_dev
907     if as_primary or disk.OpenOnSecondary():
908       r_dev.Open()
909     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
910                                 as_primary, disk.iv_name)
911
912   else:
913     result = True
914   return result
915
916
917 def AssembleBlockDevice(disk, owner, as_primary):
918   """Activate a block device for an instance.
919
920   This is a wrapper over _RecursiveAssembleBD.
921
922   Returns:
923     a /dev path for primary nodes
924     True for secondary nodes
925
926   """
927   result = _RecursiveAssembleBD(disk, owner, as_primary)
928   if isinstance(result, bdev.BlockDev):
929     result = result.dev_path
930   return result
931
932
933 def ShutdownBlockDevice(disk):
934   """Shut down a block device.
935
936   First, if the device is assembled (can `Attach()`), then the device
937   is shutdown. Then the children of the device are shutdown.
938
939   This function is called recursively. Note that we don't cache the
940   children or such, as oppossed to assemble, shutdown of different
941   devices doesn't require that the upper device was active.
942
943   """
944   r_dev = _RecursiveFindBD(disk)
945   if r_dev is not None:
946     r_path = r_dev.dev_path
947     result = r_dev.Shutdown()
948     if result:
949       DevCacheManager.RemoveCache(r_path)
950   else:
951     result = True
952   if disk.children:
953     for child in disk.children:
954       result = result and ShutdownBlockDevice(child)
955   return result
956
957
958 def MirrorAddChildren(parent_cdev, new_cdevs):
959   """Extend a mirrored block device.
960
961   """
962   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
963   if parent_bdev is None:
964     logging.error("Can't find parent device")
965     return False
966   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
967   if new_bdevs.count(None) > 0:
968     logging.error("Can't find new device(s) to add: %s:%s",
969                   new_bdevs, new_cdevs)
970     return False
971   parent_bdev.AddChildren(new_bdevs)
972   return True
973
974
975 def MirrorRemoveChildren(parent_cdev, new_cdevs):
976   """Shrink a mirrored block device.
977
978   """
979   parent_bdev = _RecursiveFindBD(parent_cdev)
980   if parent_bdev is None:
981     logging.error("Can't find parent in remove children: %s", parent_cdev)
982     return False
983   devs = []
984   for disk in new_cdevs:
985     rpath = disk.StaticDevPath()
986     if rpath is None:
987       bd = _RecursiveFindBD(disk)
988       if bd is None:
989         logging.error("Can't find dynamic device %s while removing children",
990                       disk)
991         return False
992       else:
993         devs.append(bd.dev_path)
994     else:
995       devs.append(rpath)
996   parent_bdev.RemoveChildren(devs)
997   return True
998
999
1000 def GetMirrorStatus(disks):
1001   """Get the mirroring status of a list of devices.
1002
1003   Args:
1004     disks: list of `objects.Disk`
1005
1006   Returns:
1007     list of (mirror_done, estimated_time) tuples, which
1008     are the result of bdev.BlockDevice.CombinedSyncStatus()
1009
1010   """
1011   stats = []
1012   for dsk in disks:
1013     rbd = _RecursiveFindBD(dsk)
1014     if rbd is None:
1015       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1016     stats.append(rbd.CombinedSyncStatus())
1017   return stats
1018
1019
1020 def _RecursiveFindBD(disk, allow_partial=False):
1021   """Check if a device is activated.
1022
1023   If so, return informations about the real device.
1024
1025   Args:
1026     disk: the objects.Disk instance
1027     allow_partial: don't abort the find if a child of the
1028                    device can't be found; this is intended to be
1029                    used when repairing mirrors
1030
1031   Returns:
1032     None if the device can't be found
1033     otherwise the device instance
1034
1035   """
1036   children = []
1037   if disk.children:
1038     for chdisk in disk.children:
1039       children.append(_RecursiveFindBD(chdisk))
1040
1041   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1042
1043
1044 def FindBlockDevice(disk):
1045   """Check if a device is activated.
1046
1047   If so, return informations about the real device.
1048
1049   Args:
1050     disk: the objects.Disk instance
1051   Returns:
1052     None if the device can't be found
1053     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1054
1055   """
1056   rbd = _RecursiveFindBD(disk)
1057   if rbd is None:
1058     return rbd
1059   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1060
1061
1062 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1063   """Write a file to the filesystem.
1064
1065   This allows the master to overwrite(!) a file. It will only perform
1066   the operation if the file belongs to a list of configuration files.
1067
1068   """
1069   if not os.path.isabs(file_name):
1070     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1071                   file_name)
1072     return False
1073
1074   allowed_files = [
1075     constants.CLUSTER_CONF_FILE,
1076     constants.ETC_HOSTS,
1077     constants.SSH_KNOWN_HOSTS_FILE,
1078     constants.VNC_PASSWORD_FILE,
1079     ]
1080
1081   if file_name not in allowed_files:
1082     logging.error("Filename passed to UploadFile not in allowed"
1083                  " upload targets: '%s'", file_name)
1084     return False
1085
1086   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1087                   atime=atime, mtime=mtime)
1088   return True
1089
1090
1091 def _ErrnoOrStr(err):
1092   """Format an EnvironmentError exception.
1093
1094   If the `err` argument has an errno attribute, it will be looked up
1095   and converted into a textual EXXXX description. Otherwise the string
1096   representation of the error will be returned.
1097
1098   """
1099   if hasattr(err, 'errno'):
1100     detail = errno.errorcode[err.errno]
1101   else:
1102     detail = str(err)
1103   return detail
1104
1105
1106 def _OSOndiskVersion(name, os_dir):
1107   """Compute and return the API version of a given OS.
1108
1109   This function will try to read the API version of the os given by
1110   the 'name' parameter and residing in the 'os_dir' directory.
1111
1112   Return value will be either an integer denoting the version or None in the
1113   case when this is not a valid OS name.
1114
1115   """
1116   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1117
1118   try:
1119     st = os.stat(api_file)
1120   except EnvironmentError, err:
1121     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1122                            " found (%s)" % _ErrnoOrStr(err))
1123
1124   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1125     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1126                            " a regular file")
1127
1128   try:
1129     f = open(api_file)
1130     try:
1131       api_version = f.read(256)
1132     finally:
1133       f.close()
1134   except EnvironmentError, err:
1135     raise errors.InvalidOS(name, os_dir, "error while reading the"
1136                            " API version (%s)" % _ErrnoOrStr(err))
1137
1138   api_version = api_version.strip()
1139   try:
1140     api_version = int(api_version)
1141   except (TypeError, ValueError), err:
1142     raise errors.InvalidOS(name, os_dir,
1143                            "API version is not integer (%s)" % str(err))
1144
1145   return api_version
1146
1147
1148 def DiagnoseOS(top_dirs=None):
1149   """Compute the validity for all OSes.
1150
1151   Returns an OS object for each name in all the given top directories
1152   (if not given defaults to constants.OS_SEARCH_PATH)
1153
1154   Returns:
1155     list of OS objects
1156
1157   """
1158   if top_dirs is None:
1159     top_dirs = constants.OS_SEARCH_PATH
1160
1161   result = []
1162   for dir_name in top_dirs:
1163     if os.path.isdir(dir_name):
1164       try:
1165         f_names = utils.ListVisibleFiles(dir_name)
1166       except EnvironmentError, err:
1167         logging.exception("Can't list the OS directory %s", dir_name)
1168         break
1169       for name in f_names:
1170         try:
1171           os_inst = OSFromDisk(name, base_dir=dir_name)
1172           result.append(os_inst)
1173         except errors.InvalidOS, err:
1174           result.append(objects.OS.FromInvalidOS(err))
1175
1176   return result
1177
1178
1179 def OSFromDisk(name, base_dir=None):
1180   """Create an OS instance from disk.
1181
1182   This function will return an OS instance if the given name is a
1183   valid OS name. Otherwise, it will raise an appropriate
1184   `errors.InvalidOS` exception, detailing why this is not a valid
1185   OS.
1186
1187   Args:
1188     os_dir: Directory containing the OS scripts. Defaults to a search
1189             in all the OS_SEARCH_PATH directories.
1190
1191   """
1192
1193   if base_dir is None:
1194     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1195     if os_dir is None:
1196       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1197   else:
1198     os_dir = os.path.sep.join([base_dir, name])
1199
1200   api_version = _OSOndiskVersion(name, os_dir)
1201
1202   if api_version != constants.OS_API_VERSION:
1203     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1204                            " (found %s want %s)"
1205                            % (api_version, constants.OS_API_VERSION))
1206
1207   # OS Scripts dictionary, we will populate it with the actual script names
1208   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1209
1210   for script in os_scripts:
1211     os_scripts[script] = os.path.sep.join([os_dir, script])
1212
1213     try:
1214       st = os.stat(os_scripts[script])
1215     except EnvironmentError, err:
1216       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1217                              (script, _ErrnoOrStr(err)))
1218
1219     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1220       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1221                              script)
1222
1223     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1224       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1225                              script)
1226
1227
1228   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1229                     create_script=os_scripts['create'],
1230                     export_script=os_scripts['export'],
1231                     import_script=os_scripts['import'],
1232                     rename_script=os_scripts['rename'],
1233                     api_version=api_version)
1234
1235
1236 def GrowBlockDevice(disk, amount):
1237   """Grow a stack of block devices.
1238
1239   This function is called recursively, with the childrens being the
1240   first one resize.
1241
1242   Args:
1243     disk: the disk to be grown
1244
1245   Returns: a tuple of (status, result), with:
1246     status: the result (true/false) of the operation
1247     result: the error message if the operation failed, otherwise not used
1248
1249   """
1250   r_dev = _RecursiveFindBD(disk)
1251   if r_dev is None:
1252     return False, "Cannot find block device %s" % (disk,)
1253
1254   try:
1255     r_dev.Grow(amount)
1256   except errors.BlockDeviceError, err:
1257     return False, str(err)
1258
1259   return True, None
1260
1261
1262 def SnapshotBlockDevice(disk):
1263   """Create a snapshot copy of a block device.
1264
1265   This function is called recursively, and the snapshot is actually created
1266   just for the leaf lvm backend device.
1267
1268   Args:
1269     disk: the disk to be snapshotted
1270
1271   Returns:
1272     a config entry for the actual lvm device snapshotted.
1273
1274   """
1275   if disk.children:
1276     if len(disk.children) == 1:
1277       # only one child, let's recurse on it
1278       return SnapshotBlockDevice(disk.children[0])
1279     else:
1280       # more than one child, choose one that matches
1281       for child in disk.children:
1282         if child.size == disk.size:
1283           # return implies breaking the loop
1284           return SnapshotBlockDevice(child)
1285   elif disk.dev_type == constants.LD_LV:
1286     r_dev = _RecursiveFindBD(disk)
1287     if r_dev is not None:
1288       # let's stay on the safe side and ask for the full size, for now
1289       return r_dev.Snapshot(disk.size)
1290     else:
1291       return None
1292   else:
1293     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1294                                  " '%s' of type '%s'" %
1295                                  (disk.unique_id, disk.dev_type))
1296
1297
1298 def ExportSnapshot(disk, dest_node, instance):
1299   """Export a block device snapshot to a remote node.
1300
1301   Args:
1302     disk: the snapshot block device
1303     dest_node: the node to send the image to
1304     instance: instance being exported
1305
1306   Returns:
1307     True if successful, False otherwise.
1308
1309   """
1310   inst_os = OSFromDisk(instance.os)
1311   export_script = inst_os.export_script
1312
1313   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1314                                      instance.name, int(time.time()))
1315   if not os.path.exists(constants.LOG_OS_DIR):
1316     os.mkdir(constants.LOG_OS_DIR, 0750)
1317
1318   real_os_dev = _RecursiveFindBD(disk)
1319   if real_os_dev is None:
1320     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1321                                   str(disk))
1322   real_os_dev.Open()
1323
1324   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1325   destfile = disk.physical_id[1]
1326
1327   # the target command is built out of three individual commands,
1328   # which are joined by pipes; we check each individual command for
1329   # valid parameters
1330
1331   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1332                                export_script, instance.name,
1333                                real_os_dev.dev_path, logfile)
1334
1335   comprcmd = "gzip"
1336
1337   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1338                                 destdir, destdir, destfile)
1339   remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1340                                        destcmd)
1341
1342   # all commands have been checked, so we're safe to combine them
1343   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1344
1345   result = utils.RunCmd(command)
1346
1347   if result.failed:
1348     logging.error("os snapshot export command '%s' returned error: %s"
1349                   " output: %s", command, result.fail_reason, result.output)
1350     return False
1351
1352   return True
1353
1354
1355 def FinalizeExport(instance, snap_disks):
1356   """Write out the export configuration information.
1357
1358   Args:
1359     instance: instance configuration
1360     snap_disks: snapshot block devices
1361
1362   Returns:
1363     False in case of error, True otherwise.
1364
1365   """
1366   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1367   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1368
1369   config = objects.SerializableConfigParser()
1370
1371   config.add_section(constants.INISECT_EXP)
1372   config.set(constants.INISECT_EXP, 'version', '0')
1373   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1374   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1375   config.set(constants.INISECT_EXP, 'os', instance.os)
1376   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1377
1378   config.add_section(constants.INISECT_INS)
1379   config.set(constants.INISECT_INS, 'name', instance.name)
1380   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1381   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1382   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1383
1384   nic_count = 0
1385   for nic_count, nic in enumerate(instance.nics):
1386     config.set(constants.INISECT_INS, 'nic%d_mac' %
1387                nic_count, '%s' % nic.mac)
1388     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1389     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1390                '%s' % nic.bridge)
1391   # TODO: redundant: on load can read nics until it doesn't exist
1392   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1393
1394   disk_count = 0
1395   for disk_count, disk in enumerate(snap_disks):
1396     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1397                ('%s' % disk.iv_name))
1398     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1399                ('%s' % disk.physical_id[1]))
1400     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1401                ('%d' % disk.size))
1402   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1403
1404   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1405   cfo = open(cff, 'w')
1406   try:
1407     config.write(cfo)
1408   finally:
1409     cfo.close()
1410
1411   shutil.rmtree(finaldestdir, True)
1412   shutil.move(destdir, finaldestdir)
1413
1414   return True
1415
1416
1417 def ExportInfo(dest):
1418   """Get export configuration information.
1419
1420   Args:
1421     dest: directory containing the export
1422
1423   Returns:
1424     A serializable config file containing the export info.
1425
1426   """
1427   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1428
1429   config = objects.SerializableConfigParser()
1430   config.read(cff)
1431
1432   if (not config.has_section(constants.INISECT_EXP) or
1433       not config.has_section(constants.INISECT_INS)):
1434     return None
1435
1436   return config
1437
1438
1439 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1440   """Import an os image into an instance.
1441
1442   Args:
1443     instance: the instance object
1444     os_disk: the instance-visible name of the os device
1445     swap_disk: the instance-visible name of the swap device
1446     src_node: node holding the source image
1447     src_image: path to the source image on src_node
1448
1449   Returns:
1450     False in case of error, True otherwise.
1451
1452   """
1453   cfg = _GetConfig()
1454   inst_os = OSFromDisk(instance.os)
1455   import_script = inst_os.import_script
1456
1457   os_device = instance.FindDisk(os_disk)
1458   if os_device is None:
1459     logging.error("Can't find this device-visible name '%s'", os_disk)
1460     return False
1461
1462   swap_device = instance.FindDisk(swap_disk)
1463   if swap_device is None:
1464     logging.error("Can't find this device-visible name '%s'", swap_disk)
1465     return False
1466
1467   real_os_dev = _RecursiveFindBD(os_device)
1468   if real_os_dev is None:
1469     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1470                                   str(os_device))
1471   real_os_dev.Open()
1472
1473   real_swap_dev = _RecursiveFindBD(swap_device)
1474   if real_swap_dev is None:
1475     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1476                                   str(swap_device))
1477   real_swap_dev.Open()
1478
1479   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1480                                         instance.name, int(time.time()))
1481   if not os.path.exists(constants.LOG_OS_DIR):
1482     os.mkdir(constants.LOG_OS_DIR, 0750)
1483
1484   destcmd = utils.BuildShellCmd('cat %s', src_image)
1485   remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1486                                        destcmd)
1487
1488   comprcmd = "gunzip"
1489   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1490                                inst_os.path, import_script, instance.name,
1491                                real_os_dev.dev_path, real_swap_dev.dev_path,
1492                                logfile)
1493
1494   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1495   env = {'HYPERVISOR': cfg.GetHypervisorType()}
1496
1497   result = utils.RunCmd(command, env=env)
1498
1499   if result.failed:
1500     logging.error("os import command '%s' returned error: %s"
1501                   " output: %s", command, result.fail_reason, result.output)
1502     return False
1503
1504   return True
1505
1506
1507 def ListExports():
1508   """Return a list of exports currently available on this machine.
1509
1510   """
1511   if os.path.isdir(constants.EXPORT_DIR):
1512     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1513   else:
1514     return []
1515
1516
1517 def RemoveExport(export):
1518   """Remove an existing export from the node.
1519
1520   Args:
1521     export: the name of the export to remove
1522
1523   Returns:
1524     False in case of error, True otherwise.
1525
1526   """
1527   target = os.path.join(constants.EXPORT_DIR, export)
1528
1529   shutil.rmtree(target)
1530   # TODO: catch some of the relevant exceptions and provide a pretty
1531   # error message if rmtree fails.
1532
1533   return True
1534
1535
1536 def RenameBlockDevices(devlist):
1537   """Rename a list of block devices.
1538
1539   The devlist argument is a list of tuples (disk, new_logical,
1540   new_physical). The return value will be a combined boolean result
1541   (True only if all renames succeeded).
1542
1543   """
1544   result = True
1545   for disk, unique_id in devlist:
1546     dev = _RecursiveFindBD(disk)
1547     if dev is None:
1548       result = False
1549       continue
1550     try:
1551       old_rpath = dev.dev_path
1552       dev.Rename(unique_id)
1553       new_rpath = dev.dev_path
1554       if old_rpath != new_rpath:
1555         DevCacheManager.RemoveCache(old_rpath)
1556         # FIXME: we should add the new cache information here, like:
1557         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1558         # but we don't have the owner here - maybe parse from existing
1559         # cache? for now, we only lose lvm data when we rename, which
1560         # is less critical than DRBD or MD
1561     except errors.BlockDeviceError, err:
1562       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1563       result = False
1564   return result
1565
1566
1567 def _TransformFileStorageDir(file_storage_dir):
1568   """Checks whether given file_storage_dir is valid.
1569
1570   Checks wheter the given file_storage_dir is within the cluster-wide
1571   default file_storage_dir stored in SimpleStore. Only paths under that
1572   directory are allowed.
1573
1574   Args:
1575     file_storage_dir: string with path
1576
1577   Returns:
1578     normalized file_storage_dir (string) if valid, None otherwise
1579
1580   """
1581   cfg = _GetConfig()
1582   file_storage_dir = os.path.normpath(file_storage_dir)
1583   base_file_storage_dir = cfg.GetFileStorageDir()
1584   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1585       base_file_storage_dir):
1586     logging.error("file storage directory '%s' is not under base file"
1587                   " storage directory '%s'",
1588                   file_storage_dir, base_file_storage_dir)
1589     return None
1590   return file_storage_dir
1591
1592
1593 def CreateFileStorageDir(file_storage_dir):
1594   """Create file storage directory.
1595
1596   Args:
1597     file_storage_dir: string containing the path
1598
1599   Returns:
1600     tuple with first element a boolean indicating wheter dir
1601     creation was successful or not
1602
1603   """
1604   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1605   result = True,
1606   if not file_storage_dir:
1607     result = False,
1608   else:
1609     if os.path.exists(file_storage_dir):
1610       if not os.path.isdir(file_storage_dir):
1611         logging.error("'%s' is not a directory", file_storage_dir)
1612         result = False,
1613     else:
1614       try:
1615         os.makedirs(file_storage_dir, 0750)
1616       except OSError, err:
1617         logging.error("Cannot create file storage directory '%s': %s",
1618                       file_storage_dir, err)
1619         result = False,
1620   return result
1621
1622
1623 def RemoveFileStorageDir(file_storage_dir):
1624   """Remove file storage directory.
1625
1626   Remove it only if it's empty. If not log an error and return.
1627
1628   Args:
1629     file_storage_dir: string containing the path
1630
1631   Returns:
1632     tuple with first element a boolean indicating wheter dir
1633     removal was successful or not
1634
1635   """
1636   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1637   result = True,
1638   if not file_storage_dir:
1639     result = False,
1640   else:
1641     if os.path.exists(file_storage_dir):
1642       if not os.path.isdir(file_storage_dir):
1643         logging.error("'%s' is not a directory", file_storage_dir)
1644         result = False,
1645       # deletes dir only if empty, otherwise we want to return False
1646       try:
1647         os.rmdir(file_storage_dir)
1648       except OSError, err:
1649         logging.exception("Cannot remove file storage directory '%s'",
1650                           file_storage_dir)
1651         result = False,
1652   return result
1653
1654
1655 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1656   """Rename the file storage directory.
1657
1658   Args:
1659     old_file_storage_dir: string containing the old path
1660     new_file_storage_dir: string containing the new path
1661
1662   Returns:
1663     tuple with first element a boolean indicating wheter dir
1664     rename was successful or not
1665
1666   """
1667   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1668   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1669   result = True,
1670   if not old_file_storage_dir or not new_file_storage_dir:
1671     result = False,
1672   else:
1673     if not os.path.exists(new_file_storage_dir):
1674       if os.path.isdir(old_file_storage_dir):
1675         try:
1676           os.rename(old_file_storage_dir, new_file_storage_dir)
1677         except OSError, err:
1678           logging.exception("Cannot rename '%s' to '%s'",
1679                             old_file_storage_dir, new_file_storage_dir)
1680           result =  False,
1681       else:
1682         logging.error("'%s' is not a directory", old_file_storage_dir)
1683         result = False,
1684     else:
1685       if os.path.exists(old_file_storage_dir):
1686         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1687                       old_file_storage_dir, new_file_storage_dir)
1688         result = False,
1689   return result
1690
1691
1692 def _IsJobQueueFile(file_name):
1693   """Checks whether the given filename is in the queue directory.
1694
1695   """
1696   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1697   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1698
1699   if not result:
1700     logging.error("'%s' is not a file in the queue directory",
1701                   file_name)
1702
1703   return result
1704
1705
1706 def JobQueueUpdate(file_name, content):
1707   """Updates a file in the queue directory.
1708
1709   """
1710   if not _IsJobQueueFile(file_name):
1711     return False
1712
1713   # Write and replace the file atomically
1714   utils.WriteFile(file_name, data=content)
1715
1716   return True
1717
1718
1719 def JobQueueRename(old, new):
1720   """Renames a job queue file.
1721
1722   """
1723   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1724     return False
1725
1726   os.rename(old, new)
1727
1728   return True
1729
1730
1731 def CloseBlockDevices(disks):
1732   """Closes the given block devices.
1733
1734   This means they will be switched to secondary mode (in case of DRBD).
1735
1736   """
1737   bdevs = []
1738   for cf in disks:
1739     rd = _RecursiveFindBD(cf)
1740     if rd is None:
1741       return (False, "Can't find device %s" % cf)
1742     bdevs.append(rd)
1743
1744   msg = []
1745   for rd in bdevs:
1746     try:
1747       rd.Close()
1748     except errors.BlockDeviceError, err:
1749       msg.append(str(err))
1750   if msg:
1751     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1752   else:
1753     return (True, "All devices secondary")
1754
1755
1756 class HooksRunner(object):
1757   """Hook runner.
1758
1759   This class is instantiated on the node side (ganeti-noded) and not on
1760   the master side.
1761
1762   """
1763   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1764
1765   def __init__(self, hooks_base_dir=None):
1766     """Constructor for hooks runner.
1767
1768     Args:
1769       - hooks_base_dir: if not None, this overrides the
1770         constants.HOOKS_BASE_DIR (useful for unittests)
1771
1772     """
1773     if hooks_base_dir is None:
1774       hooks_base_dir = constants.HOOKS_BASE_DIR
1775     self._BASE_DIR = hooks_base_dir
1776
1777   @staticmethod
1778   def ExecHook(script, env):
1779     """Exec one hook script.
1780
1781     Args:
1782      - script: the full path to the script
1783      - env: the environment with which to exec the script
1784
1785     """
1786     # exec the process using subprocess and log the output
1787     fdstdin = None
1788     try:
1789       fdstdin = open("/dev/null", "r")
1790       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1791                                stderr=subprocess.STDOUT, close_fds=True,
1792                                shell=False, cwd="/", env=env)
1793       output = ""
1794       try:
1795         output = child.stdout.read(4096)
1796         child.stdout.close()
1797       except EnvironmentError, err:
1798         output += "Hook script error: %s" % str(err)
1799
1800       while True:
1801         try:
1802           result = child.wait()
1803           break
1804         except EnvironmentError, err:
1805           if err.errno == errno.EINTR:
1806             continue
1807           raise
1808     finally:
1809       # try not to leak fds
1810       for fd in (fdstdin, ):
1811         if fd is not None:
1812           try:
1813             fd.close()
1814           except EnvironmentError, err:
1815             # just log the error
1816             #logging.exception("Error while closing fd %s", fd)
1817             pass
1818
1819     return result == 0, output
1820
1821   def RunHooks(self, hpath, phase, env):
1822     """Run the scripts in the hooks directory.
1823
1824     This method will not be usually overriden by child opcodes.
1825
1826     """
1827     if phase == constants.HOOKS_PHASE_PRE:
1828       suffix = "pre"
1829     elif phase == constants.HOOKS_PHASE_POST:
1830       suffix = "post"
1831     else:
1832       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1833     rr = []
1834
1835     subdir = "%s-%s.d" % (hpath, suffix)
1836     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1837     try:
1838       dir_contents = utils.ListVisibleFiles(dir_name)
1839     except OSError, err:
1840       # must log
1841       return rr
1842
1843     # we use the standard python sort order,
1844     # so 00name is the recommended naming scheme
1845     dir_contents.sort()
1846     for relname in dir_contents:
1847       fname = os.path.join(dir_name, relname)
1848       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1849           self.RE_MASK.match(relname) is not None):
1850         rrval = constants.HKR_SKIP
1851         output = ""
1852       else:
1853         result, output = self.ExecHook(fname, env)
1854         if not result:
1855           rrval = constants.HKR_FAIL
1856         else:
1857           rrval = constants.HKR_SUCCESS
1858       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1859
1860     return rr
1861
1862
1863 class IAllocatorRunner(object):
1864   """IAllocator runner.
1865
1866   This class is instantiated on the node side (ganeti-noded) and not on
1867   the master side.
1868
1869   """
1870   def Run(self, name, idata):
1871     """Run an iallocator script.
1872
1873     Return value: tuple of:
1874        - run status (one of the IARUN_ constants)
1875        - stdout
1876        - stderr
1877        - fail reason (as from utils.RunResult)
1878
1879     """
1880     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1881                                   os.path.isfile)
1882     if alloc_script is None:
1883       return (constants.IARUN_NOTFOUND, None, None, None)
1884
1885     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1886     try:
1887       os.write(fd, idata)
1888       os.close(fd)
1889       result = utils.RunCmd([alloc_script, fin_name])
1890       if result.failed:
1891         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1892                 result.fail_reason)
1893     finally:
1894       os.unlink(fin_name)
1895
1896     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1897
1898
1899 class DevCacheManager(object):
1900   """Simple class for managing a cache of block device information.
1901
1902   """
1903   _DEV_PREFIX = "/dev/"
1904   _ROOT_DIR = constants.BDEV_CACHE_DIR
1905
1906   @classmethod
1907   def _ConvertPath(cls, dev_path):
1908     """Converts a /dev/name path to the cache file name.
1909
1910     This replaces slashes with underscores and strips the /dev
1911     prefix. It then returns the full path to the cache file
1912
1913     """
1914     if dev_path.startswith(cls._DEV_PREFIX):
1915       dev_path = dev_path[len(cls._DEV_PREFIX):]
1916     dev_path = dev_path.replace("/", "_")
1917     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1918     return fpath
1919
1920   @classmethod
1921   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1922     """Updates the cache information for a given device.
1923
1924     """
1925     if dev_path is None:
1926       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1927       return
1928     fpath = cls._ConvertPath(dev_path)
1929     if on_primary:
1930       state = "primary"
1931     else:
1932       state = "secondary"
1933     if iv_name is None:
1934       iv_name = "not_visible"
1935     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1936     try:
1937       utils.WriteFile(fpath, data=fdata)
1938     except EnvironmentError, err:
1939       logging.exception("Can't update bdev cache for %s", dev_path)
1940
1941   @classmethod
1942   def RemoveCache(cls, dev_path):
1943     """Remove data for a dev_path.
1944
1945     """
1946     if dev_path is None:
1947       logging.error("DevCacheManager.RemoveCache got a None dev_path")
1948       return
1949     fpath = cls._ConvertPath(dev_path)
1950     try:
1951       utils.RemoveFile(fpath)
1952     except EnvironmentError, err:
1953       logging.exception("Can't update bdev cache for %s", dev_path)