Fix a small typo in a constant
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetSshRunner():
48   return ssh.SshRunner()
49
50
51 def _CleanDirectory(path, exclude=[]):
52   """Removes all regular files in a directory.
53
54   @param exclude: List of files to be excluded.
55   @type exclude: list
56
57   """
58   if not os.path.isdir(path):
59     return
60
61   # Normalize excluded paths
62   exclude = [os.path.normpath(i) for i in exclude]
63
64   for rel_name in utils.ListVisibleFiles(path):
65     full_name = os.path.normpath(os.path.join(path, rel_name))
66     if full_name in exclude:
67       continue
68     if os.path.isfile(full_name) and not os.path.islink(full_name):
69       utils.RemoveFile(full_name)
70
71
72 def _JobQueuePurge(keep_lock):
73   """Removes job queue files and archived jobs
74
75   """
76   if keep_lock:
77     exclude = [constants.JOB_QUEUE_LOCK_FILE]
78   else:
79     exclude = []
80
81   _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
82   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
83
84
85 def _GetMasterInfo():
86   """Return the master ip and netdev.
87
88   """
89   try:
90     ss = ssconf.SimpleStore()
91     master_netdev = ss.GetMasterNetdev()
92     master_ip = ss.GetMasterIP()
93   except errors.ConfigurationError, err:
94     logging.exception("Cluster configuration incomplete")
95     return (None, None)
96   return (master_netdev, master_ip)
97
98
99 def StartMaster(start_daemons):
100   """Activate local node as master node.
101
102   The function will always try activate the IP address of the master
103   (if someone else has it, then it won't). Then, if the start_daemons
104   parameter is True, it will also start the master daemons
105   (ganet-masterd and ganeti-rapi).
106
107   """
108   ok = True
109   master_netdev, master_ip = _GetMasterInfo()
110   if not master_netdev:
111     return False
112
113   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
114     if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
115                      source=constants.LOCALHOST_IP_ADDRESS):
116       # we already have the ip:
117       logging.debug("Already started")
118     else:
119       logging.error("Someone else has the master ip, not activating")
120       ok = False
121   else:
122     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
123                            "dev", master_netdev, "label",
124                            "%s:0" % master_netdev])
125     if result.failed:
126       logging.error("Can't activate master IP: %s", result.output)
127       ok = False
128
129     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
130                            "-s", master_ip, master_ip])
131     # we'll ignore the exit code of arping
132
133   # and now start the master and rapi daemons
134   if start_daemons:
135     for daemon in 'ganeti-masterd', 'ganeti-rapi':
136       result = utils.RunCmd([daemon])
137       if result.failed:
138         logging.error("Can't start daemon %s: %s", daemon, result.output)
139         ok = False
140   return ok
141
142
143 def StopMaster(stop_daemons):
144   """Deactivate this node as master.
145
146   The function will always try to deactivate the IP address of the
147   master. Then, if the stop_daemons parameter is True, it will also
148   stop the master daemons (ganet-masterd and ganeti-rapi).
149
150   """
151   master_netdev, master_ip = _GetMasterInfo()
152   if not master_netdev:
153     return False
154
155   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
156                          "dev", master_netdev])
157   if result.failed:
158     logging.error("Can't remove the master IP, error: %s", result.output)
159     # but otherwise ignore the failure
160
161   if stop_daemons:
162     # stop/kill the rapi and the master daemon
163     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
164       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
165
166   return True
167
168
169 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
170   """Joins this node to the cluster.
171
172   This does the following:
173       - updates the hostkeys of the machine (rsa and dsa)
174       - adds the ssh private key to the user
175       - adds the ssh public key to the users' authorized_keys file
176
177   """
178   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
179                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
180                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
181                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
182   for name, content, mode in sshd_keys:
183     utils.WriteFile(name, data=content, mode=mode)
184
185   try:
186     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
187                                                     mkdir=True)
188   except errors.OpExecError, err:
189     logging.exception("Error while processing user ssh files")
190     return False
191
192   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
193     utils.WriteFile(name, data=content, mode=0600)
194
195   utils.AddAuthorizedKey(auth_keys, sshpub)
196
197   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
198
199   return True
200
201
202 def LeaveCluster():
203   """Cleans up the current node and prepares it to be removed from the cluster.
204
205   """
206   _CleanDirectory(constants.DATA_DIR)
207
208   # The lock can be removed because we're going to quit anyway.
209   _JobQueuePurge(keep_lock=False)
210
211   try:
212     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
213   except errors.OpExecError:
214     logging.exception("Error while processing ssh files")
215     return
216
217   f = open(pub_key, 'r')
218   try:
219     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
220   finally:
221     f.close()
222
223   utils.RemoveFile(priv_key)
224   utils.RemoveFile(pub_key)
225
226   # Return a reassuring string to the caller, and quit
227   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
228
229
230 def GetNodeInfo(vgname):
231   """Gives back a hash with different informations about the node.
232
233   Returns:
234     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
235       'memory_free' : xxx, 'memory_total' : xxx }
236     where
237     vg_size is the size of the configured volume group in MiB
238     vg_free is the free size of the volume group in MiB
239     memory_dom0 is the memory allocated for domain0 in MiB
240     memory_free is the currently available (free) ram in MiB
241     memory_total is the total number of ram in MiB
242
243   """
244   outputarray = {}
245   vginfo = _GetVGInfo(vgname)
246   outputarray['vg_size'] = vginfo['vg_size']
247   outputarray['vg_free'] = vginfo['vg_free']
248
249   hyper = hypervisor.GetHypervisor()
250   hyp_info = hyper.GetNodeInfo()
251   if hyp_info is not None:
252     outputarray.update(hyp_info)
253
254   f = open("/proc/sys/kernel/random/boot_id", 'r')
255   try:
256     outputarray["bootid"] = f.read(128).rstrip("\n")
257   finally:
258     f.close()
259
260   return outputarray
261
262
263 def VerifyNode(what):
264   """Verify the status of the local node.
265
266   Args:
267     what - a dictionary of things to check:
268       'filelist' : list of files for which to compute checksums
269       'nodelist' : list of nodes we should check communication with
270       'hypervisor': run the hypervisor-specific verify
271
272   Requested files on local node are checksummed and the result returned.
273
274   The nodelist is traversed, with the following checks being made
275   for each node:
276   - known_hosts key correct
277   - correct resolving of node name (target node returns its own hostname
278     by ssh-execution of 'hostname', result compared against name in list.
279
280   """
281   result = {}
282
283   if 'hypervisor' in what:
284     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
285
286   if 'filelist' in what:
287     result['filelist'] = utils.FingerprintFiles(what['filelist'])
288
289   if 'nodelist' in what:
290     result['nodelist'] = {}
291     random.shuffle(what['nodelist'])
292     for node in what['nodelist']:
293       success, message = _GetSshRunner().VerifyNodeHostname(node)
294       if not success:
295         result['nodelist'][node] = message
296   if 'node-net-test' in what:
297     result['node-net-test'] = {}
298     my_name = utils.HostInfo().name
299     my_pip = my_sip = None
300     for name, pip, sip in what['node-net-test']:
301       if name == my_name:
302         my_pip = pip
303         my_sip = sip
304         break
305     if not my_pip:
306       result['node-net-test'][my_name] = ("Can't find my own"
307                                           " primary/secondary IP"
308                                           " in the node list")
309     else:
310       port = ssconf.SimpleStore().GetNodeDaemonPort()
311       for name, pip, sip in what['node-net-test']:
312         fail = []
313         if not utils.TcpPing(pip, port, source=my_pip):
314           fail.append("primary")
315         if sip != pip:
316           if not utils.TcpPing(sip, port, source=my_sip):
317             fail.append("secondary")
318         if fail:
319           result['node-net-test'][name] = ("failure using the %s"
320                                            " interface(s)" %
321                                            " and ".join(fail))
322
323   return result
324
325
326 def GetVolumeList(vg_name):
327   """Compute list of logical volumes and their size.
328
329   Returns:
330     dictionary of all partions (key) with their size (in MiB), inactive
331     and online status:
332     {'test1': ('20.06', True, True)}
333
334   """
335   lvs = {}
336   sep = '|'
337   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
338                          "--separator=%s" % sep,
339                          "-olv_name,lv_size,lv_attr", vg_name])
340   if result.failed:
341     logging.error("Failed to list logical volumes, lvs output: %s",
342                   result.output)
343     return result.output
344
345   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
346   for line in result.stdout.splitlines():
347     line = line.strip()
348     match = valid_line_re.match(line)
349     if not match:
350       logging.error("Invalid line returned from lvs output: '%s'", line)
351       continue
352     name, size, attr = match.groups()
353     inactive = attr[4] == '-'
354     online = attr[5] == 'o'
355     lvs[name] = (size, inactive, online)
356
357   return lvs
358
359
360 def ListVolumeGroups():
361   """List the volume groups and their size.
362
363   Returns:
364     Dictionary with keys volume name and values the size of the volume
365
366   """
367   return utils.ListVolumeGroups()
368
369
370 def NodeVolumes():
371   """List all volumes on this node.
372
373   """
374   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
375                          "--separator=|",
376                          "--options=lv_name,lv_size,devices,vg_name"])
377   if result.failed:
378     logging.error("Failed to list logical volumes, lvs output: %s",
379                   result.output)
380     return {}
381
382   def parse_dev(dev):
383     if '(' in dev:
384       return dev.split('(')[0]
385     else:
386       return dev
387
388   def map_line(line):
389     return {
390       'name': line[0].strip(),
391       'size': line[1].strip(),
392       'dev': parse_dev(line[2].strip()),
393       'vg': line[3].strip(),
394     }
395
396   return [map_line(line.split('|')) for line in result.stdout.splitlines()
397           if line.count('|') >= 3]
398
399
400 def BridgesExist(bridges_list):
401   """Check if a list of bridges exist on the current node.
402
403   Returns:
404     True if all of them exist, false otherwise
405
406   """
407   for bridge in bridges_list:
408     if not utils.BridgeExists(bridge):
409       return False
410
411   return True
412
413
414 def GetInstanceList():
415   """Provides a list of instances.
416
417   Returns:
418     A list of all running instances on the current node
419     - instance1.example.com
420     - instance2.example.com
421
422   """
423   try:
424     names = hypervisor.GetHypervisor().ListInstances()
425   except errors.HypervisorError, err:
426     logging.exception("Error enumerating instances")
427     raise
428
429   return names
430
431
432 def GetInstanceInfo(instance):
433   """Gives back the informations about an instance as a dictionary.
434
435   Args:
436     instance: name of the instance (ex. instance1.example.com)
437
438   Returns:
439     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
440     where
441     memory: memory size of instance (int)
442     state: xen state of instance (string)
443     time: cpu time of instance (float)
444
445   """
446   output = {}
447
448   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
449   if iinfo is not None:
450     output['memory'] = iinfo[2]
451     output['state'] = iinfo[4]
452     output['time'] = iinfo[5]
453
454   return output
455
456
457 def GetAllInstancesInfo():
458   """Gather data about all instances.
459
460   This is the equivalent of `GetInstanceInfo()`, except that it
461   computes data for all instances at once, thus being faster if one
462   needs data about more than one instance.
463
464   Returns: a dictionary of dictionaries, keys being the instance name,
465     and with values:
466     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
467     where
468     memory: memory size of instance (int)
469     state: xen state of instance (string)
470     time: cpu time of instance (float)
471     vcpus: the number of cpus
472
473   """
474   output = {}
475
476   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
477   if iinfo:
478     for name, inst_id, memory, vcpus, state, times in iinfo:
479       output[name] = {
480         'memory': memory,
481         'vcpus': vcpus,
482         'state': state,
483         'time': times,
484         }
485
486   return output
487
488
489 def AddOSToInstance(instance, os_disk, swap_disk):
490   """Add an OS to an instance.
491
492   Args:
493     instance: the instance object
494     os_disk: the instance-visible name of the os device
495     swap_disk: the instance-visible name of the swap device
496
497   """
498   inst_os = OSFromDisk(instance.os)
499
500   create_script = inst_os.create_script
501
502   os_device = instance.FindDisk(os_disk)
503   if os_device is None:
504     logging.error("Can't find this device-visible name '%s'", os_disk)
505     return False
506
507   swap_device = instance.FindDisk(swap_disk)
508   if swap_device is None:
509     logging.error("Can't find this device-visible name '%s'", swap_disk)
510     return False
511
512   real_os_dev = _RecursiveFindBD(os_device)
513   if real_os_dev is None:
514     raise errors.BlockDeviceError("Block device '%s' is not set up" %
515                                   str(os_device))
516   real_os_dev.Open()
517
518   real_swap_dev = _RecursiveFindBD(swap_device)
519   if real_swap_dev is None:
520     raise errors.BlockDeviceError("Block device '%s' is not set up" %
521                                   str(swap_device))
522   real_swap_dev.Open()
523
524   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
525                                      instance.name, int(time.time()))
526   if not os.path.exists(constants.LOG_OS_DIR):
527     os.mkdir(constants.LOG_OS_DIR, 0750)
528
529   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
530                                 inst_os.path, create_script, instance.name,
531                                 real_os_dev.dev_path, real_swap_dev.dev_path,
532                                 logfile)
533   env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
534
535   result = utils.RunCmd(command, env=env)
536   if result.failed:
537     logging.error("os create command '%s' returned error: %s, logfile: %s,"
538                   " output: %s", command, result.fail_reason, logfile,
539                   result.output)
540     return False
541
542   return True
543
544
545 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
546   """Run the OS rename script for an instance.
547
548   Args:
549     instance: the instance object
550     old_name: the old name of the instance
551     os_disk: the instance-visible name of the os device
552     swap_disk: the instance-visible name of the swap device
553
554   """
555   inst_os = OSFromDisk(instance.os)
556
557   script = inst_os.rename_script
558
559   os_device = instance.FindDisk(os_disk)
560   if os_device is None:
561     logging.error("Can't find this device-visible name '%s'", os_disk)
562     return False
563
564   swap_device = instance.FindDisk(swap_disk)
565   if swap_device is None:
566     logging.error("Can't find this device-visible name '%s'", swap_disk)
567     return False
568
569   real_os_dev = _RecursiveFindBD(os_device)
570   if real_os_dev is None:
571     raise errors.BlockDeviceError("Block device '%s' is not set up" %
572                                   str(os_device))
573   real_os_dev.Open()
574
575   real_swap_dev = _RecursiveFindBD(swap_device)
576   if real_swap_dev is None:
577     raise errors.BlockDeviceError("Block device '%s' is not set up" %
578                                   str(swap_device))
579   real_swap_dev.Open()
580
581   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
582                                            old_name,
583                                            instance.name, int(time.time()))
584   if not os.path.exists(constants.LOG_OS_DIR):
585     os.mkdir(constants.LOG_OS_DIR, 0750)
586
587   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
588                                 inst_os.path, script, old_name, instance.name,
589                                 real_os_dev.dev_path, real_swap_dev.dev_path,
590                                 logfile)
591
592   result = utils.RunCmd(command)
593
594   if result.failed:
595     logging.error("os create command '%s' returned error: %s output: %s",
596                   command, result.fail_reason, result.output)
597     return False
598
599   return True
600
601
602 def _GetVGInfo(vg_name):
603   """Get informations about the volume group.
604
605   Args:
606     vg_name: the volume group
607
608   Returns:
609     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
610     where
611     vg_size is the total size of the volume group in MiB
612     vg_free is the free size of the volume group in MiB
613     pv_count are the number of physical disks in that vg
614
615   If an error occurs during gathering of data, we return the same dict
616   with keys all set to None.
617
618   """
619   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
620
621   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
622                          "--nosuffix", "--units=m", "--separator=:", vg_name])
623
624   if retval.failed:
625     logging.error("volume group %s not present", vg_name)
626     return retdic
627   valarr = retval.stdout.strip().rstrip(':').split(':')
628   if len(valarr) == 3:
629     try:
630       retdic = {
631         "vg_size": int(round(float(valarr[0]), 0)),
632         "vg_free": int(round(float(valarr[1]), 0)),
633         "pv_count": int(valarr[2]),
634         }
635     except ValueError, err:
636       logging.exception("Fail to parse vgs output")
637   else:
638     logging.error("vgs output has the wrong number of fields (expected"
639                   " three): %s", str(valarr))
640   return retdic
641
642
643 def _GatherBlockDevs(instance):
644   """Set up an instance's block device(s).
645
646   This is run on the primary node at instance startup. The block
647   devices must be already assembled.
648
649   """
650   block_devices = []
651   for disk in instance.disks:
652     device = _RecursiveFindBD(disk)
653     if device is None:
654       raise errors.BlockDeviceError("Block device '%s' is not set up." %
655                                     str(disk))
656     device.Open()
657     block_devices.append((disk, device))
658   return block_devices
659
660
661 def StartInstance(instance, extra_args):
662   """Start an instance.
663
664   Args:
665     instance - name of instance to start.
666
667   """
668   running_instances = GetInstanceList()
669
670   if instance.name in running_instances:
671     return True
672
673   block_devices = _GatherBlockDevs(instance)
674   hyper = hypervisor.GetHypervisor()
675
676   try:
677     hyper.StartInstance(instance, block_devices, extra_args)
678   except errors.HypervisorError, err:
679     logging.exception("Failed to start instance")
680     return False
681
682   return True
683
684
685 def ShutdownInstance(instance):
686   """Shut an instance down.
687
688   Args:
689     instance - name of instance to shutdown.
690
691   """
692   running_instances = GetInstanceList()
693
694   if instance.name not in running_instances:
695     return True
696
697   hyper = hypervisor.GetHypervisor()
698   try:
699     hyper.StopInstance(instance)
700   except errors.HypervisorError, err:
701     logging.error("Failed to stop instance")
702     return False
703
704   # test every 10secs for 2min
705   shutdown_ok = False
706
707   time.sleep(1)
708   for dummy in range(11):
709     if instance.name not in GetInstanceList():
710       break
711     time.sleep(10)
712   else:
713     # the shutdown did not succeed
714     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
715
716     try:
717       hyper.StopInstance(instance, force=True)
718     except errors.HypervisorError, err:
719       logging.exception("Failed to stop instance")
720       return False
721
722     time.sleep(1)
723     if instance.name in GetInstanceList():
724       logging.error("could not shutdown instance '%s' even by destroy",
725                     instance.name)
726       return False
727
728   return True
729
730
731 def RebootInstance(instance, reboot_type, extra_args):
732   """Reboot an instance.
733
734   Args:
735     instance    - name of instance to reboot
736     reboot_type - how to reboot [soft,hard,full]
737
738   """
739   running_instances = GetInstanceList()
740
741   if instance.name not in running_instances:
742     logging.error("Cannot reboot instance that is not running")
743     return False
744
745   hyper = hypervisor.GetHypervisor()
746   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
747     try:
748       hyper.RebootInstance(instance)
749     except errors.HypervisorError, err:
750       logging.exception("Failed to soft reboot instance")
751       return False
752   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
753     try:
754       ShutdownInstance(instance)
755       StartInstance(instance, extra_args)
756     except errors.HypervisorError, err:
757       logging.exception("Failed to hard reboot instance")
758       return False
759   else:
760     raise errors.ParameterError("reboot_type invalid")
761
762
763   return True
764
765
766 def MigrateInstance(instance, target, live):
767   """Migrates an instance to another node.
768
769   """
770   hyper = hypervisor.GetHypervisor()
771
772   try:
773     hyper.MigrateInstance(instance, target, live)
774   except errors.HypervisorError, err:
775     msg = "Failed to migrate instance: %s" % str(err)
776     logging.error(msg)
777     return (False, msg)
778   return (True, "Migration successfull")
779
780
781 def CreateBlockDevice(disk, size, owner, on_primary, info):
782   """Creates a block device for an instance.
783
784   Args:
785    disk: a ganeti.objects.Disk object
786    size: the size of the physical underlying device
787    owner: a string with the name of the instance
788    on_primary: a boolean indicating if it is the primary node or not
789    info: string that will be sent to the physical device creation
790
791   Returns:
792     the new unique_id of the device (this can sometime be
793     computed only after creation), or None. On secondary nodes,
794     it's not required to return anything.
795
796   """
797   clist = []
798   if disk.children:
799     for child in disk.children:
800       crdev = _RecursiveAssembleBD(child, owner, on_primary)
801       if on_primary or disk.AssembleOnSecondary():
802         # we need the children open in case the device itself has to
803         # be assembled
804         crdev.Open()
805       clist.append(crdev)
806   try:
807     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
808     if device is not None:
809       logging.info("removing existing device %s", disk)
810       device.Remove()
811   except errors.BlockDeviceError, err:
812     pass
813
814   device = bdev.Create(disk.dev_type, disk.physical_id,
815                        clist, size)
816   if device is None:
817     raise ValueError("Can't create child device for %s, %s" %
818                      (disk, size))
819   if on_primary or disk.AssembleOnSecondary():
820     if not device.Assemble():
821       errorstring = "Can't assemble device after creation"
822       logging.error(errorstring)
823       raise errors.BlockDeviceError("%s, very unusual event - check the node"
824                                     " daemon logs" % errorstring)
825     device.SetSyncSpeed(constants.SYNC_SPEED)
826     if on_primary or disk.OpenOnSecondary():
827       device.Open(force=True)
828     DevCacheManager.UpdateCache(device.dev_path, owner,
829                                 on_primary, disk.iv_name)
830
831   device.SetInfo(info)
832
833   physical_id = device.unique_id
834   return physical_id
835
836
837 def RemoveBlockDevice(disk):
838   """Remove a block device.
839
840   This is intended to be called recursively.
841
842   """
843   try:
844     # since we are removing the device, allow a partial match
845     # this allows removal of broken mirrors
846     rdev = _RecursiveFindBD(disk, allow_partial=True)
847   except errors.BlockDeviceError, err:
848     # probably can't attach
849     logging.info("Can't attach to device %s in remove", disk)
850     rdev = None
851   if rdev is not None:
852     r_path = rdev.dev_path
853     result = rdev.Remove()
854     if result:
855       DevCacheManager.RemoveCache(r_path)
856   else:
857     result = True
858   if disk.children:
859     for child in disk.children:
860       result = result and RemoveBlockDevice(child)
861   return result
862
863
864 def _RecursiveAssembleBD(disk, owner, as_primary):
865   """Activate a block device for an instance.
866
867   This is run on the primary and secondary nodes for an instance.
868
869   This function is called recursively.
870
871   Args:
872     disk: a objects.Disk object
873     as_primary: if we should make the block device read/write
874
875   Returns:
876     the assembled device or None (in case no device was assembled)
877
878   If the assembly is not successful, an exception is raised.
879
880   """
881   children = []
882   if disk.children:
883     mcn = disk.ChildrenNeeded()
884     if mcn == -1:
885       mcn = 0 # max number of Nones allowed
886     else:
887       mcn = len(disk.children) - mcn # max number of Nones
888     for chld_disk in disk.children:
889       try:
890         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
891       except errors.BlockDeviceError, err:
892         if children.count(None) >= mcn:
893           raise
894         cdev = None
895         logging.debug("Error in child activation: %s", str(err))
896       children.append(cdev)
897
898   if as_primary or disk.AssembleOnSecondary():
899     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
900     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
901     result = r_dev
902     if as_primary or disk.OpenOnSecondary():
903       r_dev.Open()
904     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
905                                 as_primary, disk.iv_name)
906
907   else:
908     result = True
909   return result
910
911
912 def AssembleBlockDevice(disk, owner, as_primary):
913   """Activate a block device for an instance.
914
915   This is a wrapper over _RecursiveAssembleBD.
916
917   Returns:
918     a /dev path for primary nodes
919     True for secondary nodes
920
921   """
922   result = _RecursiveAssembleBD(disk, owner, as_primary)
923   if isinstance(result, bdev.BlockDev):
924     result = result.dev_path
925   return result
926
927
928 def ShutdownBlockDevice(disk):
929   """Shut down a block device.
930
931   First, if the device is assembled (can `Attach()`), then the device
932   is shutdown. Then the children of the device are shutdown.
933
934   This function is called recursively. Note that we don't cache the
935   children or such, as oppossed to assemble, shutdown of different
936   devices doesn't require that the upper device was active.
937
938   """
939   r_dev = _RecursiveFindBD(disk)
940   if r_dev is not None:
941     r_path = r_dev.dev_path
942     result = r_dev.Shutdown()
943     if result:
944       DevCacheManager.RemoveCache(r_path)
945   else:
946     result = True
947   if disk.children:
948     for child in disk.children:
949       result = result and ShutdownBlockDevice(child)
950   return result
951
952
953 def MirrorAddChildren(parent_cdev, new_cdevs):
954   """Extend a mirrored block device.
955
956   """
957   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
958   if parent_bdev is None:
959     logging.error("Can't find parent device")
960     return False
961   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
962   if new_bdevs.count(None) > 0:
963     logging.error("Can't find new device(s) to add: %s:%s",
964                   new_bdevs, new_cdevs)
965     return False
966   parent_bdev.AddChildren(new_bdevs)
967   return True
968
969
970 def MirrorRemoveChildren(parent_cdev, new_cdevs):
971   """Shrink a mirrored block device.
972
973   """
974   parent_bdev = _RecursiveFindBD(parent_cdev)
975   if parent_bdev is None:
976     logging.error("Can't find parent in remove children: %s", parent_cdev)
977     return False
978   devs = []
979   for disk in new_cdevs:
980     rpath = disk.StaticDevPath()
981     if rpath is None:
982       bd = _RecursiveFindBD(disk)
983       if bd is None:
984         logging.error("Can't find dynamic device %s while removing children",
985                       disk)
986         return False
987       else:
988         devs.append(bd.dev_path)
989     else:
990       devs.append(rpath)
991   parent_bdev.RemoveChildren(devs)
992   return True
993
994
995 def GetMirrorStatus(disks):
996   """Get the mirroring status of a list of devices.
997
998   Args:
999     disks: list of `objects.Disk`
1000
1001   Returns:
1002     list of (mirror_done, estimated_time) tuples, which
1003     are the result of bdev.BlockDevice.CombinedSyncStatus()
1004
1005   """
1006   stats = []
1007   for dsk in disks:
1008     rbd = _RecursiveFindBD(dsk)
1009     if rbd is None:
1010       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1011     stats.append(rbd.CombinedSyncStatus())
1012   return stats
1013
1014
1015 def _RecursiveFindBD(disk, allow_partial=False):
1016   """Check if a device is activated.
1017
1018   If so, return informations about the real device.
1019
1020   Args:
1021     disk: the objects.Disk instance
1022     allow_partial: don't abort the find if a child of the
1023                    device can't be found; this is intended to be
1024                    used when repairing mirrors
1025
1026   Returns:
1027     None if the device can't be found
1028     otherwise the device instance
1029
1030   """
1031   children = []
1032   if disk.children:
1033     for chdisk in disk.children:
1034       children.append(_RecursiveFindBD(chdisk))
1035
1036   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1037
1038
1039 def FindBlockDevice(disk):
1040   """Check if a device is activated.
1041
1042   If so, return informations about the real device.
1043
1044   Args:
1045     disk: the objects.Disk instance
1046   Returns:
1047     None if the device can't be found
1048     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1049
1050   """
1051   rbd = _RecursiveFindBD(disk)
1052   if rbd is None:
1053     return rbd
1054   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1055
1056
1057 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1058   """Write a file to the filesystem.
1059
1060   This allows the master to overwrite(!) a file. It will only perform
1061   the operation if the file belongs to a list of configuration files.
1062
1063   """
1064   if not os.path.isabs(file_name):
1065     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1066                   file_name)
1067     return False
1068
1069   allowed_files = [
1070     constants.CLUSTER_CONF_FILE,
1071     constants.ETC_HOSTS,
1072     constants.SSH_KNOWN_HOSTS_FILE,
1073     constants.VNC_PASSWORD_FILE,
1074     ]
1075   allowed_files.extend(ssconf.SimpleStore().GetFileList())
1076
1077   if file_name not in allowed_files:
1078     logging.error("Filename passed to UploadFile not in allowed"
1079                  " upload targets: '%s'", file_name)
1080     return False
1081
1082   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1083                   atime=atime, mtime=mtime)
1084   return True
1085
1086
1087 def _ErrnoOrStr(err):
1088   """Format an EnvironmentError exception.
1089
1090   If the `err` argument has an errno attribute, it will be looked up
1091   and converted into a textual EXXXX description. Otherwise the string
1092   representation of the error will be returned.
1093
1094   """
1095   if hasattr(err, 'errno'):
1096     detail = errno.errorcode[err.errno]
1097   else:
1098     detail = str(err)
1099   return detail
1100
1101
1102 def _OSOndiskVersion(name, os_dir):
1103   """Compute and return the API version of a given OS.
1104
1105   This function will try to read the API version of the os given by
1106   the 'name' parameter and residing in the 'os_dir' directory.
1107
1108   Return value will be either an integer denoting the version or None in the
1109   case when this is not a valid OS name.
1110
1111   """
1112   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1113
1114   try:
1115     st = os.stat(api_file)
1116   except EnvironmentError, err:
1117     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1118                            " found (%s)" % _ErrnoOrStr(err))
1119
1120   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1121     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1122                            " a regular file")
1123
1124   try:
1125     f = open(api_file)
1126     try:
1127       api_version = f.read(256)
1128     finally:
1129       f.close()
1130   except EnvironmentError, err:
1131     raise errors.InvalidOS(name, os_dir, "error while reading the"
1132                            " API version (%s)" % _ErrnoOrStr(err))
1133
1134   api_version = api_version.strip()
1135   try:
1136     api_version = int(api_version)
1137   except (TypeError, ValueError), err:
1138     raise errors.InvalidOS(name, os_dir,
1139                            "API version is not integer (%s)" % str(err))
1140
1141   return api_version
1142
1143
1144 def DiagnoseOS(top_dirs=None):
1145   """Compute the validity for all OSes.
1146
1147   Returns an OS object for each name in all the given top directories
1148   (if not given defaults to constants.OS_SEARCH_PATH)
1149
1150   Returns:
1151     list of OS objects
1152
1153   """
1154   if top_dirs is None:
1155     top_dirs = constants.OS_SEARCH_PATH
1156
1157   result = []
1158   for dir_name in top_dirs:
1159     if os.path.isdir(dir_name):
1160       try:
1161         f_names = utils.ListVisibleFiles(dir_name)
1162       except EnvironmentError, err:
1163         logging.exception("Can't list the OS directory %s", dir_name)
1164         break
1165       for name in f_names:
1166         try:
1167           os_inst = OSFromDisk(name, base_dir=dir_name)
1168           result.append(os_inst)
1169         except errors.InvalidOS, err:
1170           result.append(objects.OS.FromInvalidOS(err))
1171
1172   return result
1173
1174
1175 def OSFromDisk(name, base_dir=None):
1176   """Create an OS instance from disk.
1177
1178   This function will return an OS instance if the given name is a
1179   valid OS name. Otherwise, it will raise an appropriate
1180   `errors.InvalidOS` exception, detailing why this is not a valid
1181   OS.
1182
1183   Args:
1184     os_dir: Directory containing the OS scripts. Defaults to a search
1185             in all the OS_SEARCH_PATH directories.
1186
1187   """
1188
1189   if base_dir is None:
1190     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1191     if os_dir is None:
1192       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1193   else:
1194     os_dir = os.path.sep.join([base_dir, name])
1195
1196   api_version = _OSOndiskVersion(name, os_dir)
1197
1198   if api_version != constants.OS_API_VERSION:
1199     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1200                            " (found %s want %s)"
1201                            % (api_version, constants.OS_API_VERSION))
1202
1203   # OS Scripts dictionary, we will populate it with the actual script names
1204   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1205
1206   for script in os_scripts:
1207     os_scripts[script] = os.path.sep.join([os_dir, script])
1208
1209     try:
1210       st = os.stat(os_scripts[script])
1211     except EnvironmentError, err:
1212       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1213                              (script, _ErrnoOrStr(err)))
1214
1215     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1216       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1217                              script)
1218
1219     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1220       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1221                              script)
1222
1223
1224   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1225                     create_script=os_scripts['create'],
1226                     export_script=os_scripts['export'],
1227                     import_script=os_scripts['import'],
1228                     rename_script=os_scripts['rename'],
1229                     api_version=api_version)
1230
1231
1232 def GrowBlockDevice(disk, amount):
1233   """Grow a stack of block devices.
1234
1235   This function is called recursively, with the childrens being the
1236   first one resize.
1237
1238   Args:
1239     disk: the disk to be grown
1240
1241   Returns: a tuple of (status, result), with:
1242     status: the result (true/false) of the operation
1243     result: the error message if the operation failed, otherwise not used
1244
1245   """
1246   r_dev = _RecursiveFindBD(disk)
1247   if r_dev is None:
1248     return False, "Cannot find block device %s" % (disk,)
1249
1250   try:
1251     r_dev.Grow(amount)
1252   except errors.BlockDeviceError, err:
1253     return False, str(err)
1254
1255   return True, None
1256
1257
1258 def SnapshotBlockDevice(disk):
1259   """Create a snapshot copy of a block device.
1260
1261   This function is called recursively, and the snapshot is actually created
1262   just for the leaf lvm backend device.
1263
1264   Args:
1265     disk: the disk to be snapshotted
1266
1267   Returns:
1268     a config entry for the actual lvm device snapshotted.
1269
1270   """
1271   if disk.children:
1272     if len(disk.children) == 1:
1273       # only one child, let's recurse on it
1274       return SnapshotBlockDevice(disk.children[0])
1275     else:
1276       # more than one child, choose one that matches
1277       for child in disk.children:
1278         if child.size == disk.size:
1279           # return implies breaking the loop
1280           return SnapshotBlockDevice(child)
1281   elif disk.dev_type == constants.LD_LV:
1282     r_dev = _RecursiveFindBD(disk)
1283     if r_dev is not None:
1284       # let's stay on the safe side and ask for the full size, for now
1285       return r_dev.Snapshot(disk.size)
1286     else:
1287       return None
1288   else:
1289     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1290                                  " '%s' of type '%s'" %
1291                                  (disk.unique_id, disk.dev_type))
1292
1293
1294 def ExportSnapshot(disk, dest_node, instance):
1295   """Export a block device snapshot to a remote node.
1296
1297   Args:
1298     disk: the snapshot block device
1299     dest_node: the node to send the image to
1300     instance: instance being exported
1301
1302   Returns:
1303     True if successful, False otherwise.
1304
1305   """
1306   inst_os = OSFromDisk(instance.os)
1307   export_script = inst_os.export_script
1308
1309   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1310                                      instance.name, int(time.time()))
1311   if not os.path.exists(constants.LOG_OS_DIR):
1312     os.mkdir(constants.LOG_OS_DIR, 0750)
1313
1314   real_os_dev = _RecursiveFindBD(disk)
1315   if real_os_dev is None:
1316     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1317                                   str(disk))
1318   real_os_dev.Open()
1319
1320   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1321   destfile = disk.physical_id[1]
1322
1323   # the target command is built out of three individual commands,
1324   # which are joined by pipes; we check each individual command for
1325   # valid parameters
1326
1327   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1328                                export_script, instance.name,
1329                                real_os_dev.dev_path, logfile)
1330
1331   comprcmd = "gzip"
1332
1333   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1334                                 destdir, destdir, destfile)
1335   remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1336                                        destcmd)
1337
1338   # all commands have been checked, so we're safe to combine them
1339   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1340
1341   result = utils.RunCmd(command)
1342
1343   if result.failed:
1344     logging.error("os snapshot export command '%s' returned error: %s"
1345                   " output: %s", command, result.fail_reason, result.output)
1346     return False
1347
1348   return True
1349
1350
1351 def FinalizeExport(instance, snap_disks):
1352   """Write out the export configuration information.
1353
1354   Args:
1355     instance: instance configuration
1356     snap_disks: snapshot block devices
1357
1358   Returns:
1359     False in case of error, True otherwise.
1360
1361   """
1362   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1363   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1364
1365   config = objects.SerializableConfigParser()
1366
1367   config.add_section(constants.INISECT_EXP)
1368   config.set(constants.INISECT_EXP, 'version', '0')
1369   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1370   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1371   config.set(constants.INISECT_EXP, 'os', instance.os)
1372   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1373
1374   config.add_section(constants.INISECT_INS)
1375   config.set(constants.INISECT_INS, 'name', instance.name)
1376   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1377   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1378   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1379
1380   nic_count = 0
1381   for nic_count, nic in enumerate(instance.nics):
1382     config.set(constants.INISECT_INS, 'nic%d_mac' %
1383                nic_count, '%s' % nic.mac)
1384     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1385     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1386                '%s' % nic.bridge)
1387   # TODO: redundant: on load can read nics until it doesn't exist
1388   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1389
1390   disk_count = 0
1391   for disk_count, disk in enumerate(snap_disks):
1392     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1393                ('%s' % disk.iv_name))
1394     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1395                ('%s' % disk.physical_id[1]))
1396     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1397                ('%d' % disk.size))
1398   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1399
1400   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1401   cfo = open(cff, 'w')
1402   try:
1403     config.write(cfo)
1404   finally:
1405     cfo.close()
1406
1407   shutil.rmtree(finaldestdir, True)
1408   shutil.move(destdir, finaldestdir)
1409
1410   return True
1411
1412
1413 def ExportInfo(dest):
1414   """Get export configuration information.
1415
1416   Args:
1417     dest: directory containing the export
1418
1419   Returns:
1420     A serializable config file containing the export info.
1421
1422   """
1423   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1424
1425   config = objects.SerializableConfigParser()
1426   config.read(cff)
1427
1428   if (not config.has_section(constants.INISECT_EXP) or
1429       not config.has_section(constants.INISECT_INS)):
1430     return None
1431
1432   return config
1433
1434
1435 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1436   """Import an os image into an instance.
1437
1438   Args:
1439     instance: the instance object
1440     os_disk: the instance-visible name of the os device
1441     swap_disk: the instance-visible name of the swap device
1442     src_node: node holding the source image
1443     src_image: path to the source image on src_node
1444
1445   Returns:
1446     False in case of error, True otherwise.
1447
1448   """
1449   inst_os = OSFromDisk(instance.os)
1450   import_script = inst_os.import_script
1451
1452   os_device = instance.FindDisk(os_disk)
1453   if os_device is None:
1454     logging.error("Can't find this device-visible name '%s'", os_disk)
1455     return False
1456
1457   swap_device = instance.FindDisk(swap_disk)
1458   if swap_device is None:
1459     logging.error("Can't find this device-visible name '%s'", swap_disk)
1460     return False
1461
1462   real_os_dev = _RecursiveFindBD(os_device)
1463   if real_os_dev is None:
1464     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1465                                   str(os_device))
1466   real_os_dev.Open()
1467
1468   real_swap_dev = _RecursiveFindBD(swap_device)
1469   if real_swap_dev is None:
1470     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1471                                   str(swap_device))
1472   real_swap_dev.Open()
1473
1474   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1475                                         instance.name, int(time.time()))
1476   if not os.path.exists(constants.LOG_OS_DIR):
1477     os.mkdir(constants.LOG_OS_DIR, 0750)
1478
1479   destcmd = utils.BuildShellCmd('cat %s', src_image)
1480   remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1481                                        destcmd)
1482
1483   comprcmd = "gunzip"
1484   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1485                                inst_os.path, import_script, instance.name,
1486                                real_os_dev.dev_path, real_swap_dev.dev_path,
1487                                logfile)
1488
1489   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1490   env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
1491
1492   result = utils.RunCmd(command, env=env)
1493
1494   if result.failed:
1495     logging.error("os import command '%s' returned error: %s"
1496                   " output: %s", command, result.fail_reason, result.output)
1497     return False
1498
1499   return True
1500
1501
1502 def ListExports():
1503   """Return a list of exports currently available on this machine.
1504
1505   """
1506   if os.path.isdir(constants.EXPORT_DIR):
1507     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1508   else:
1509     return []
1510
1511
1512 def RemoveExport(export):
1513   """Remove an existing export from the node.
1514
1515   Args:
1516     export: the name of the export to remove
1517
1518   Returns:
1519     False in case of error, True otherwise.
1520
1521   """
1522   target = os.path.join(constants.EXPORT_DIR, export)
1523
1524   shutil.rmtree(target)
1525   # TODO: catch some of the relevant exceptions and provide a pretty
1526   # error message if rmtree fails.
1527
1528   return True
1529
1530
1531 def RenameBlockDevices(devlist):
1532   """Rename a list of block devices.
1533
1534   The devlist argument is a list of tuples (disk, new_logical,
1535   new_physical). The return value will be a combined boolean result
1536   (True only if all renames succeeded).
1537
1538   """
1539   result = True
1540   for disk, unique_id in devlist:
1541     dev = _RecursiveFindBD(disk)
1542     if dev is None:
1543       result = False
1544       continue
1545     try:
1546       old_rpath = dev.dev_path
1547       dev.Rename(unique_id)
1548       new_rpath = dev.dev_path
1549       if old_rpath != new_rpath:
1550         DevCacheManager.RemoveCache(old_rpath)
1551         # FIXME: we should add the new cache information here, like:
1552         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1553         # but we don't have the owner here - maybe parse from existing
1554         # cache? for now, we only lose lvm data when we rename, which
1555         # is less critical than DRBD or MD
1556     except errors.BlockDeviceError, err:
1557       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1558       result = False
1559   return result
1560
1561
1562 def _TransformFileStorageDir(file_storage_dir):
1563   """Checks whether given file_storage_dir is valid.
1564
1565   Checks wheter the given file_storage_dir is within the cluster-wide
1566   default file_storage_dir stored in SimpleStore. Only paths under that
1567   directory are allowed.
1568
1569   Args:
1570     file_storage_dir: string with path
1571
1572   Returns:
1573     normalized file_storage_dir (string) if valid, None otherwise
1574
1575   """
1576   file_storage_dir = os.path.normpath(file_storage_dir)
1577   base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1578   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1579       base_file_storage_dir):
1580     logging.error("file storage directory '%s' is not under base file"
1581                   " storage directory '%s'",
1582                   file_storage_dir, base_file_storage_dir)
1583     return None
1584   return file_storage_dir
1585
1586
1587 def CreateFileStorageDir(file_storage_dir):
1588   """Create file storage directory.
1589
1590   Args:
1591     file_storage_dir: string containing the path
1592
1593   Returns:
1594     tuple with first element a boolean indicating wheter dir
1595     creation was successful or not
1596
1597   """
1598   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1599   result = True,
1600   if not file_storage_dir:
1601     result = False,
1602   else:
1603     if os.path.exists(file_storage_dir):
1604       if not os.path.isdir(file_storage_dir):
1605         logging.error("'%s' is not a directory", file_storage_dir)
1606         result = False,
1607     else:
1608       try:
1609         os.makedirs(file_storage_dir, 0750)
1610       except OSError, err:
1611         logging.error("Cannot create file storage directory '%s': %s",
1612                       file_storage_dir, err)
1613         result = False,
1614   return result
1615
1616
1617 def RemoveFileStorageDir(file_storage_dir):
1618   """Remove file storage directory.
1619
1620   Remove it only if it's empty. If not log an error and return.
1621
1622   Args:
1623     file_storage_dir: string containing the path
1624
1625   Returns:
1626     tuple with first element a boolean indicating wheter dir
1627     removal was successful or not
1628
1629   """
1630   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1631   result = True,
1632   if not file_storage_dir:
1633     result = False,
1634   else:
1635     if os.path.exists(file_storage_dir):
1636       if not os.path.isdir(file_storage_dir):
1637         logging.error("'%s' is not a directory", file_storage_dir)
1638         result = False,
1639       # deletes dir only if empty, otherwise we want to return False
1640       try:
1641         os.rmdir(file_storage_dir)
1642       except OSError, err:
1643         logging.exception("Cannot remove file storage directory '%s'",
1644                           file_storage_dir)
1645         result = False,
1646   return result
1647
1648
1649 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1650   """Rename the file storage directory.
1651
1652   Args:
1653     old_file_storage_dir: string containing the old path
1654     new_file_storage_dir: string containing the new path
1655
1656   Returns:
1657     tuple with first element a boolean indicating wheter dir
1658     rename was successful or not
1659
1660   """
1661   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1662   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1663   result = True,
1664   if not old_file_storage_dir or not new_file_storage_dir:
1665     result = False,
1666   else:
1667     if not os.path.exists(new_file_storage_dir):
1668       if os.path.isdir(old_file_storage_dir):
1669         try:
1670           os.rename(old_file_storage_dir, new_file_storage_dir)
1671         except OSError, err:
1672           logging.exception("Cannot rename '%s' to '%s'",
1673                             old_file_storage_dir, new_file_storage_dir)
1674           result =  False,
1675       else:
1676         logging.error("'%s' is not a directory", old_file_storage_dir)
1677         result = False,
1678     else:
1679       if os.path.exists(old_file_storage_dir):
1680         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1681                       old_file_storage_dir, new_file_storage_dir)
1682         result = False,
1683   return result
1684
1685
1686 def _IsJobQueueFile(file_name):
1687   """Checks whether the given filename is in the queue directory.
1688
1689   """
1690   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1691   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1692
1693   if not result:
1694     logging.error("'%s' is not a file in the queue directory",
1695                   file_name)
1696
1697   return result
1698
1699
1700 def JobQueueUpdate(file_name, content):
1701   """Updates a file in the queue directory.
1702
1703   """
1704   if not _IsJobQueueFile(file_name):
1705     return False
1706
1707   # Write and replace the file atomically
1708   utils.WriteFile(file_name, data=content)
1709
1710   return True
1711
1712
1713 def JobQueuePurge():
1714   """Removes job queue files and archived jobs
1715
1716   """
1717   # The lock must not be removed, otherwise another process could create
1718   # it again.
1719   return _JobQueuePurge(keep_lock=True)
1720
1721
1722 def JobQueueRename(old, new):
1723   """Renames a job queue file.
1724
1725   """
1726   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1727     return False
1728
1729   os.rename(old, new)
1730
1731   return True
1732
1733
1734 def CloseBlockDevices(disks):
1735   """Closes the given block devices.
1736
1737   This means they will be switched to secondary mode (in case of DRBD).
1738
1739   """
1740   bdevs = []
1741   for cf in disks:
1742     rd = _RecursiveFindBD(cf)
1743     if rd is None:
1744       return (False, "Can't find device %s" % cf)
1745     bdevs.append(rd)
1746
1747   msg = []
1748   for rd in bdevs:
1749     try:
1750       rd.Close()
1751     except errors.BlockDeviceError, err:
1752       msg.append(str(err))
1753   if msg:
1754     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1755   else:
1756     return (True, "All devices secondary")
1757
1758
1759 class HooksRunner(object):
1760   """Hook runner.
1761
1762   This class is instantiated on the node side (ganeti-noded) and not on
1763   the master side.
1764
1765   """
1766   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1767
1768   def __init__(self, hooks_base_dir=None):
1769     """Constructor for hooks runner.
1770
1771     Args:
1772       - hooks_base_dir: if not None, this overrides the
1773         constants.HOOKS_BASE_DIR (useful for unittests)
1774
1775     """
1776     if hooks_base_dir is None:
1777       hooks_base_dir = constants.HOOKS_BASE_DIR
1778     self._BASE_DIR = hooks_base_dir
1779
1780   @staticmethod
1781   def ExecHook(script, env):
1782     """Exec one hook script.
1783
1784     Args:
1785      - script: the full path to the script
1786      - env: the environment with which to exec the script
1787
1788     """
1789     # exec the process using subprocess and log the output
1790     fdstdin = None
1791     try:
1792       fdstdin = open("/dev/null", "r")
1793       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1794                                stderr=subprocess.STDOUT, close_fds=True,
1795                                shell=False, cwd="/", env=env)
1796       output = ""
1797       try:
1798         output = child.stdout.read(4096)
1799         child.stdout.close()
1800       except EnvironmentError, err:
1801         output += "Hook script error: %s" % str(err)
1802
1803       while True:
1804         try:
1805           result = child.wait()
1806           break
1807         except EnvironmentError, err:
1808           if err.errno == errno.EINTR:
1809             continue
1810           raise
1811     finally:
1812       # try not to leak fds
1813       for fd in (fdstdin, ):
1814         if fd is not None:
1815           try:
1816             fd.close()
1817           except EnvironmentError, err:
1818             # just log the error
1819             #logging.exception("Error while closing fd %s", fd)
1820             pass
1821
1822     return result == 0, output
1823
1824   def RunHooks(self, hpath, phase, env):
1825     """Run the scripts in the hooks directory.
1826
1827     This method will not be usually overriden by child opcodes.
1828
1829     """
1830     if phase == constants.HOOKS_PHASE_PRE:
1831       suffix = "pre"
1832     elif phase == constants.HOOKS_PHASE_POST:
1833       suffix = "post"
1834     else:
1835       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1836     rr = []
1837
1838     subdir = "%s-%s.d" % (hpath, suffix)
1839     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1840     try:
1841       dir_contents = utils.ListVisibleFiles(dir_name)
1842     except OSError, err:
1843       # must log
1844       return rr
1845
1846     # we use the standard python sort order,
1847     # so 00name is the recommended naming scheme
1848     dir_contents.sort()
1849     for relname in dir_contents:
1850       fname = os.path.join(dir_name, relname)
1851       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1852           self.RE_MASK.match(relname) is not None):
1853         rrval = constants.HKR_SKIP
1854         output = ""
1855       else:
1856         result, output = self.ExecHook(fname, env)
1857         if not result:
1858           rrval = constants.HKR_FAIL
1859         else:
1860           rrval = constants.HKR_SUCCESS
1861       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1862
1863     return rr
1864
1865
1866 class IAllocatorRunner(object):
1867   """IAllocator runner.
1868
1869   This class is instantiated on the node side (ganeti-noded) and not on
1870   the master side.
1871
1872   """
1873   def Run(self, name, idata):
1874     """Run an iallocator script.
1875
1876     Return value: tuple of:
1877        - run status (one of the IARUN_ constants)
1878        - stdout
1879        - stderr
1880        - fail reason (as from utils.RunResult)
1881
1882     """
1883     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1884                                   os.path.isfile)
1885     if alloc_script is None:
1886       return (constants.IARUN_NOTFOUND, None, None, None)
1887
1888     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1889     try:
1890       os.write(fd, idata)
1891       os.close(fd)
1892       result = utils.RunCmd([alloc_script, fin_name])
1893       if result.failed:
1894         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1895                 result.fail_reason)
1896     finally:
1897       os.unlink(fin_name)
1898
1899     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1900
1901
1902 class DevCacheManager(object):
1903   """Simple class for managing a cache of block device information.
1904
1905   """
1906   _DEV_PREFIX = "/dev/"
1907   _ROOT_DIR = constants.BDEV_CACHE_DIR
1908
1909   @classmethod
1910   def _ConvertPath(cls, dev_path):
1911     """Converts a /dev/name path to the cache file name.
1912
1913     This replaces slashes with underscores and strips the /dev
1914     prefix. It then returns the full path to the cache file
1915
1916     """
1917     if dev_path.startswith(cls._DEV_PREFIX):
1918       dev_path = dev_path[len(cls._DEV_PREFIX):]
1919     dev_path = dev_path.replace("/", "_")
1920     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1921     return fpath
1922
1923   @classmethod
1924   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1925     """Updates the cache information for a given device.
1926
1927     """
1928     if dev_path is None:
1929       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1930       return
1931     fpath = cls._ConvertPath(dev_path)
1932     if on_primary:
1933       state = "primary"
1934     else:
1935       state = "secondary"
1936     if iv_name is None:
1937       iv_name = "not_visible"
1938     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1939     try:
1940       utils.WriteFile(fpath, data=fdata)
1941     except EnvironmentError, err:
1942       logging.exception("Can't update bdev cache for %s", dev_path)
1943
1944   @classmethod
1945   def RemoveCache(cls, dev_path):
1946     """Remove data for a dev_path.
1947
1948     """
1949     if dev_path is None:
1950       logging.error("DevCacheManager.RemoveCache got a None dev_path")
1951       return
1952     fpath = cls._ConvertPath(dev_path)
1953     try:
1954       utils.RemoveFile(fpath)
1955     except EnvironmentError, err:
1956       logging.exception("Can't update bdev cache for %s", dev_path)