jqueue: Log more information when running opcodes
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34
35 from ganeti import logger
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import ssh
39 from ganeti import hypervisor
40 from ganeti import constants
41 from ganeti import bdev
42 from ganeti import objects
43 from ganeti import ssconf
44
45
46 def _GetSshRunner():
47   return ssh.SshRunner()
48
49
50 def StartMaster():
51   """Activate local node as master node.
52
53   There are two needed steps for this:
54     - run the master script
55     - register the cron script
56
57   """
58   result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
59
60   if result.failed:
61     logger.Error("could not activate cluster interface with command %s,"
62                  " error: '%s'" % (result.cmd, result.output))
63     return False
64
65   return True
66
67
68 def StopMaster():
69   """Deactivate this node as master.
70
71   This runs the master stop script.
72
73   """
74   result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
75
76   if result.failed:
77     logger.Error("could not deactivate cluster interface with command %s,"
78                  " error: '%s'" % (result.cmd, result.output))
79     return False
80
81   return True
82
83
84 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
85   """Joins this node to the cluster.
86
87   This does the following:
88       - updates the hostkeys of the machine (rsa and dsa)
89       - adds the ssh private key to the user
90       - adds the ssh public key to the users' authorized_keys file
91
92   """
93   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
94                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
95                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
96                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
97   for name, content, mode in sshd_keys:
98     utils.WriteFile(name, data=content, mode=mode)
99
100   try:
101     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
102                                                     mkdir=True)
103   except errors.OpExecError, err:
104     logger.Error("Error while processing user ssh files: %s" % err)
105     return False
106
107   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
108     utils.WriteFile(name, data=content, mode=0600)
109
110   utils.AddAuthorizedKey(auth_keys, sshpub)
111
112   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
113
114   return True
115
116
117 def LeaveCluster():
118   """Cleans up the current node and prepares it to be removed from the cluster.
119
120   """
121   if os.path.isdir(constants.DATA_DIR):
122     for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
123       full_name = os.path.join(constants.DATA_DIR, rel_name)
124       if os.path.isfile(full_name) and not os.path.islink(full_name):
125         utils.RemoveFile(full_name)
126
127   try:
128     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
129   except errors.OpExecError, err:
130     logger.Error("Error while processing ssh files: %s" % err)
131     return
132
133   f = open(pub_key, 'r')
134   try:
135     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
136   finally:
137     f.close()
138
139   utils.RemoveFile(priv_key)
140   utils.RemoveFile(pub_key)
141
142   # Return a reassuring string to the caller, and quit
143   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
144
145
146 def GetNodeInfo(vgname):
147   """Gives back a hash with different informations about the node.
148
149   Returns:
150     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
151       'memory_free' : xxx, 'memory_total' : xxx }
152     where
153     vg_size is the size of the configured volume group in MiB
154     vg_free is the free size of the volume group in MiB
155     memory_dom0 is the memory allocated for domain0 in MiB
156     memory_free is the currently available (free) ram in MiB
157     memory_total is the total number of ram in MiB
158
159   """
160   outputarray = {}
161   vginfo = _GetVGInfo(vgname)
162   outputarray['vg_size'] = vginfo['vg_size']
163   outputarray['vg_free'] = vginfo['vg_free']
164
165   hyper = hypervisor.GetHypervisor()
166   hyp_info = hyper.GetNodeInfo()
167   if hyp_info is not None:
168     outputarray.update(hyp_info)
169
170   f = open("/proc/sys/kernel/random/boot_id", 'r')
171   try:
172     outputarray["bootid"] = f.read(128).rstrip("\n")
173   finally:
174     f.close()
175
176   return outputarray
177
178
179 def VerifyNode(what):
180   """Verify the status of the local node.
181
182   Args:
183     what - a dictionary of things to check:
184       'filelist' : list of files for which to compute checksums
185       'nodelist' : list of nodes we should check communication with
186       'hypervisor': run the hypervisor-specific verify
187
188   Requested files on local node are checksummed and the result returned.
189
190   The nodelist is traversed, with the following checks being made
191   for each node:
192   - known_hosts key correct
193   - correct resolving of node name (target node returns its own hostname
194     by ssh-execution of 'hostname', result compared against name in list.
195
196   """
197   result = {}
198
199   if 'hypervisor' in what:
200     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
201
202   if 'filelist' in what:
203     result['filelist'] = utils.FingerprintFiles(what['filelist'])
204
205   if 'nodelist' in what:
206     result['nodelist'] = {}
207     random.shuffle(what['nodelist'])
208     for node in what['nodelist']:
209       success, message = _GetSshRunner().VerifyNodeHostname(node)
210       if not success:
211         result['nodelist'][node] = message
212   if 'node-net-test' in what:
213     result['node-net-test'] = {}
214     my_name = utils.HostInfo().name
215     my_pip = my_sip = None
216     for name, pip, sip in what['node-net-test']:
217       if name == my_name:
218         my_pip = pip
219         my_sip = sip
220         break
221     if not my_pip:
222       result['node-net-test'][my_name] = ("Can't find my own"
223                                           " primary/secondary IP"
224                                           " in the node list")
225     else:
226       port = ssconf.SimpleStore().GetNodeDaemonPort()
227       for name, pip, sip in what['node-net-test']:
228         fail = []
229         if not utils.TcpPing(pip, port, source=my_pip):
230           fail.append("primary")
231         if sip != pip:
232           if not utils.TcpPing(sip, port, source=my_sip):
233             fail.append("secondary")
234         if fail:
235           result['node-net-test'][name] = ("failure using the %s"
236                                            " interface(s)" %
237                                            " and ".join(fail))
238
239   return result
240
241
242 def GetVolumeList(vg_name):
243   """Compute list of logical volumes and their size.
244
245   Returns:
246     dictionary of all partions (key) with their size (in MiB), inactive
247     and online status:
248     {'test1': ('20.06', True, True)}
249
250   """
251   lvs = {}
252   sep = '|'
253   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
254                          "--separator=%s" % sep,
255                          "-olv_name,lv_size,lv_attr", vg_name])
256   if result.failed:
257     logger.Error("Failed to list logical volumes, lvs output: %s" %
258                  result.output)
259     return result.output
260
261   for line in result.stdout.splitlines():
262     line = line.strip().rstrip(sep)
263     name, size, attr = line.split(sep)
264     if len(attr) != 6:
265       attr = '------'
266     inactive = attr[4] == '-'
267     online = attr[5] == 'o'
268     lvs[name] = (size, inactive, online)
269
270   return lvs
271
272
273 def ListVolumeGroups():
274   """List the volume groups and their size.
275
276   Returns:
277     Dictionary with keys volume name and values the size of the volume
278
279   """
280   return utils.ListVolumeGroups()
281
282
283 def NodeVolumes():
284   """List all volumes on this node.
285
286   """
287   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
288                          "--separator=|",
289                          "--options=lv_name,lv_size,devices,vg_name"])
290   if result.failed:
291     logger.Error("Failed to list logical volumes, lvs output: %s" %
292                  result.output)
293     return {}
294
295   def parse_dev(dev):
296     if '(' in dev:
297       return dev.split('(')[0]
298     else:
299       return dev
300
301   def map_line(line):
302     return {
303       'name': line[0].strip(),
304       'size': line[1].strip(),
305       'dev': parse_dev(line[2].strip()),
306       'vg': line[3].strip(),
307     }
308
309   return [map_line(line.split('|')) for line in result.stdout.splitlines()]
310
311
312 def BridgesExist(bridges_list):
313   """Check if a list of bridges exist on the current node.
314
315   Returns:
316     True if all of them exist, false otherwise
317
318   """
319   for bridge in bridges_list:
320     if not utils.BridgeExists(bridge):
321       return False
322
323   return True
324
325
326 def GetInstanceList():
327   """Provides a list of instances.
328
329   Returns:
330     A list of all running instances on the current node
331     - instance1.example.com
332     - instance2.example.com
333
334   """
335   try:
336     names = hypervisor.GetHypervisor().ListInstances()
337   except errors.HypervisorError, err:
338     logger.Error("error enumerating instances: %s" % str(err))
339     raise
340
341   return names
342
343
344 def GetInstanceInfo(instance):
345   """Gives back the informations about an instance as a dictionary.
346
347   Args:
348     instance: name of the instance (ex. instance1.example.com)
349
350   Returns:
351     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
352     where
353     memory: memory size of instance (int)
354     state: xen state of instance (string)
355     time: cpu time of instance (float)
356
357   """
358   output = {}
359
360   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
361   if iinfo is not None:
362     output['memory'] = iinfo[2]
363     output['state'] = iinfo[4]
364     output['time'] = iinfo[5]
365
366   return output
367
368
369 def GetAllInstancesInfo():
370   """Gather data about all instances.
371
372   This is the equivalent of `GetInstanceInfo()`, except that it
373   computes data for all instances at once, thus being faster if one
374   needs data about more than one instance.
375
376   Returns: a dictionary of dictionaries, keys being the instance name,
377     and with values:
378     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
379     where
380     memory: memory size of instance (int)
381     state: xen state of instance (string)
382     time: cpu time of instance (float)
383     vcpus: the number of cpus
384
385   """
386   output = {}
387
388   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
389   if iinfo:
390     for name, inst_id, memory, vcpus, state, times in iinfo:
391       output[name] = {
392         'memory': memory,
393         'vcpus': vcpus,
394         'state': state,
395         'time': times,
396         }
397
398   return output
399
400
401 def AddOSToInstance(instance, os_disk, swap_disk):
402   """Add an OS to an instance.
403
404   Args:
405     instance: the instance object
406     os_disk: the instance-visible name of the os device
407     swap_disk: the instance-visible name of the swap device
408
409   """
410   inst_os = OSFromDisk(instance.os)
411
412   create_script = inst_os.create_script
413
414   os_device = instance.FindDisk(os_disk)
415   if os_device is None:
416     logger.Error("Can't find this device-visible name '%s'" % os_disk)
417     return False
418
419   swap_device = instance.FindDisk(swap_disk)
420   if swap_device is None:
421     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
422     return False
423
424   real_os_dev = _RecursiveFindBD(os_device)
425   if real_os_dev is None:
426     raise errors.BlockDeviceError("Block device '%s' is not set up" %
427                                   str(os_device))
428   real_os_dev.Open()
429
430   real_swap_dev = _RecursiveFindBD(swap_device)
431   if real_swap_dev is None:
432     raise errors.BlockDeviceError("Block device '%s' is not set up" %
433                                   str(swap_device))
434   real_swap_dev.Open()
435
436   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
437                                      instance.name, int(time.time()))
438   if not os.path.exists(constants.LOG_OS_DIR):
439     os.mkdir(constants.LOG_OS_DIR, 0750)
440
441   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
442                                 inst_os.path, create_script, instance.name,
443                                 real_os_dev.dev_path, real_swap_dev.dev_path,
444                                 logfile)
445
446   result = utils.RunCmd(command)
447   if result.failed:
448     logger.Error("os create command '%s' returned error: %s, logfile: %s,"
449                  " output: %s" %
450                  (command, result.fail_reason, logfile, result.output))
451     return False
452
453   return True
454
455
456 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
457   """Run the OS rename script for an instance.
458
459   Args:
460     instance: the instance object
461     old_name: the old name of the instance
462     os_disk: the instance-visible name of the os device
463     swap_disk: the instance-visible name of the swap device
464
465   """
466   inst_os = OSFromDisk(instance.os)
467
468   script = inst_os.rename_script
469
470   os_device = instance.FindDisk(os_disk)
471   if os_device is None:
472     logger.Error("Can't find this device-visible name '%s'" % os_disk)
473     return False
474
475   swap_device = instance.FindDisk(swap_disk)
476   if swap_device is None:
477     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
478     return False
479
480   real_os_dev = _RecursiveFindBD(os_device)
481   if real_os_dev is None:
482     raise errors.BlockDeviceError("Block device '%s' is not set up" %
483                                   str(os_device))
484   real_os_dev.Open()
485
486   real_swap_dev = _RecursiveFindBD(swap_device)
487   if real_swap_dev is None:
488     raise errors.BlockDeviceError("Block device '%s' is not set up" %
489                                   str(swap_device))
490   real_swap_dev.Open()
491
492   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
493                                            old_name,
494                                            instance.name, int(time.time()))
495   if not os.path.exists(constants.LOG_OS_DIR):
496     os.mkdir(constants.LOG_OS_DIR, 0750)
497
498   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
499                                 inst_os.path, script, old_name, instance.name,
500                                 real_os_dev.dev_path, real_swap_dev.dev_path,
501                                 logfile)
502
503   result = utils.RunCmd(command)
504
505   if result.failed:
506     logger.Error("os create command '%s' returned error: %s"
507                  " output: %s" %
508                  (command, result.fail_reason, result.output))
509     return False
510
511   return True
512
513
514 def _GetVGInfo(vg_name):
515   """Get informations about the volume group.
516
517   Args:
518     vg_name: the volume group
519
520   Returns:
521     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
522     where
523     vg_size is the total size of the volume group in MiB
524     vg_free is the free size of the volume group in MiB
525     pv_count are the number of physical disks in that vg
526
527   If an error occurs during gathering of data, we return the same dict
528   with keys all set to None.
529
530   """
531   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
532
533   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
534                          "--nosuffix", "--units=m", "--separator=:", vg_name])
535
536   if retval.failed:
537     errmsg = "volume group %s not present" % vg_name
538     logger.Error(errmsg)
539     return retdic
540   valarr = retval.stdout.strip().rstrip(':').split(':')
541   if len(valarr) == 3:
542     try:
543       retdic = {
544         "vg_size": int(round(float(valarr[0]), 0)),
545         "vg_free": int(round(float(valarr[1]), 0)),
546         "pv_count": int(valarr[2]),
547         }
548     except ValueError, err:
549       logger.Error("Fail to parse vgs output: %s" % str(err))
550   else:
551     logger.Error("vgs output has the wrong number of fields (expected"
552                  " three): %s" % str(valarr))
553   return retdic
554
555
556 def _GatherBlockDevs(instance):
557   """Set up an instance's block device(s).
558
559   This is run on the primary node at instance startup. The block
560   devices must be already assembled.
561
562   """
563   block_devices = []
564   for disk in instance.disks:
565     device = _RecursiveFindBD(disk)
566     if device is None:
567       raise errors.BlockDeviceError("Block device '%s' is not set up." %
568                                     str(disk))
569     device.Open()
570     block_devices.append((disk, device))
571   return block_devices
572
573
574 def StartInstance(instance, extra_args):
575   """Start an instance.
576
577   Args:
578     instance - name of instance to start.
579
580   """
581   running_instances = GetInstanceList()
582
583   if instance.name in running_instances:
584     return True
585
586   block_devices = _GatherBlockDevs(instance)
587   hyper = hypervisor.GetHypervisor()
588
589   try:
590     hyper.StartInstance(instance, block_devices, extra_args)
591   except errors.HypervisorError, err:
592     logger.Error("Failed to start instance: %s" % err)
593     return False
594
595   return True
596
597
598 def ShutdownInstance(instance):
599   """Shut an instance down.
600
601   Args:
602     instance - name of instance to shutdown.
603
604   """
605   running_instances = GetInstanceList()
606
607   if instance.name not in running_instances:
608     return True
609
610   hyper = hypervisor.GetHypervisor()
611   try:
612     hyper.StopInstance(instance)
613   except errors.HypervisorError, err:
614     logger.Error("Failed to stop instance: %s" % err)
615     return False
616
617   # test every 10secs for 2min
618   shutdown_ok = False
619
620   time.sleep(1)
621   for dummy in range(11):
622     if instance.name not in GetInstanceList():
623       break
624     time.sleep(10)
625   else:
626     # the shutdown did not succeed
627     logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance)
628
629     try:
630       hyper.StopInstance(instance, force=True)
631     except errors.HypervisorError, err:
632       logger.Error("Failed to stop instance: %s" % err)
633       return False
634
635     time.sleep(1)
636     if instance.name in GetInstanceList():
637       logger.Error("could not shutdown instance '%s' even by destroy")
638       return False
639
640   return True
641
642
643 def RebootInstance(instance, reboot_type, extra_args):
644   """Reboot an instance.
645
646   Args:
647     instance    - name of instance to reboot
648     reboot_type - how to reboot [soft,hard,full]
649
650   """
651   running_instances = GetInstanceList()
652
653   if instance.name not in running_instances:
654     logger.Error("Cannot reboot instance that is not running")
655     return False
656
657   hyper = hypervisor.GetHypervisor()
658   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
659     try:
660       hyper.RebootInstance(instance)
661     except errors.HypervisorError, err:
662       logger.Error("Failed to soft reboot instance: %s" % err)
663       return False
664   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
665     try:
666       ShutdownInstance(instance)
667       StartInstance(instance, extra_args)
668     except errors.HypervisorError, err:
669       logger.Error("Failed to hard reboot instance: %s" % err)
670       return False
671   else:
672     raise errors.ParameterError("reboot_type invalid")
673
674
675   return True
676
677
678 def MigrateInstance(instance, target, live):
679   """Migrates an instance to another node.
680
681   """
682   hyper = hypervisor.GetHypervisor()
683
684   try:
685     hyper.MigrateInstance(instance, target, live)
686   except errors.HypervisorError, err:
687     msg = "Failed to migrate instance: %s" % str(err)
688     logger.Error(msg)
689     return (False, msg)
690   return (True, "Migration successfull")
691
692
693 def CreateBlockDevice(disk, size, owner, on_primary, info):
694   """Creates a block device for an instance.
695
696   Args:
697    disk: a ganeti.objects.Disk object
698    size: the size of the physical underlying device
699    owner: a string with the name of the instance
700    on_primary: a boolean indicating if it is the primary node or not
701    info: string that will be sent to the physical device creation
702
703   Returns:
704     the new unique_id of the device (this can sometime be
705     computed only after creation), or None. On secondary nodes,
706     it's not required to return anything.
707
708   """
709   clist = []
710   if disk.children:
711     for child in disk.children:
712       crdev = _RecursiveAssembleBD(child, owner, on_primary)
713       if on_primary or disk.AssembleOnSecondary():
714         # we need the children open in case the device itself has to
715         # be assembled
716         crdev.Open()
717       clist.append(crdev)
718   try:
719     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
720     if device is not None:
721       logger.Info("removing existing device %s" % disk)
722       device.Remove()
723   except errors.BlockDeviceError, err:
724     pass
725
726   device = bdev.Create(disk.dev_type, disk.physical_id,
727                        clist, size)
728   if device is None:
729     raise ValueError("Can't create child device for %s, %s" %
730                      (disk, size))
731   if on_primary or disk.AssembleOnSecondary():
732     if not device.Assemble():
733       errorstring = "Can't assemble device after creation"
734       logger.Error(errorstring)
735       raise errors.BlockDeviceError("%s, very unusual event - check the node"
736                                     " daemon logs" % errorstring)
737     device.SetSyncSpeed(constants.SYNC_SPEED)
738     if on_primary or disk.OpenOnSecondary():
739       device.Open(force=True)
740     DevCacheManager.UpdateCache(device.dev_path, owner,
741                                 on_primary, disk.iv_name)
742
743   device.SetInfo(info)
744
745   physical_id = device.unique_id
746   return physical_id
747
748
749 def RemoveBlockDevice(disk):
750   """Remove a block device.
751
752   This is intended to be called recursively.
753
754   """
755   try:
756     # since we are removing the device, allow a partial match
757     # this allows removal of broken mirrors
758     rdev = _RecursiveFindBD(disk, allow_partial=True)
759   except errors.BlockDeviceError, err:
760     # probably can't attach
761     logger.Info("Can't attach to device %s in remove" % disk)
762     rdev = None
763   if rdev is not None:
764     r_path = rdev.dev_path
765     result = rdev.Remove()
766     if result:
767       DevCacheManager.RemoveCache(r_path)
768   else:
769     result = True
770   if disk.children:
771     for child in disk.children:
772       result = result and RemoveBlockDevice(child)
773   return result
774
775
776 def _RecursiveAssembleBD(disk, owner, as_primary):
777   """Activate a block device for an instance.
778
779   This is run on the primary and secondary nodes for an instance.
780
781   This function is called recursively.
782
783   Args:
784     disk: a objects.Disk object
785     as_primary: if we should make the block device read/write
786
787   Returns:
788     the assembled device or None (in case no device was assembled)
789
790   If the assembly is not successful, an exception is raised.
791
792   """
793   children = []
794   if disk.children:
795     mcn = disk.ChildrenNeeded()
796     if mcn == -1:
797       mcn = 0 # max number of Nones allowed
798     else:
799       mcn = len(disk.children) - mcn # max number of Nones
800     for chld_disk in disk.children:
801       try:
802         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
803       except errors.BlockDeviceError, err:
804         if children.count(None) >= mcn:
805           raise
806         cdev = None
807         logger.Debug("Error in child activation: %s" % str(err))
808       children.append(cdev)
809
810   if as_primary or disk.AssembleOnSecondary():
811     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
812     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
813     result = r_dev
814     if as_primary or disk.OpenOnSecondary():
815       r_dev.Open()
816     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
817                                 as_primary, disk.iv_name)
818
819   else:
820     result = True
821   return result
822
823
824 def AssembleBlockDevice(disk, owner, as_primary):
825   """Activate a block device for an instance.
826
827   This is a wrapper over _RecursiveAssembleBD.
828
829   Returns:
830     a /dev path for primary nodes
831     True for secondary nodes
832
833   """
834   result = _RecursiveAssembleBD(disk, owner, as_primary)
835   if isinstance(result, bdev.BlockDev):
836     result = result.dev_path
837   return result
838
839
840 def ShutdownBlockDevice(disk):
841   """Shut down a block device.
842
843   First, if the device is assembled (can `Attach()`), then the device
844   is shutdown. Then the children of the device are shutdown.
845
846   This function is called recursively. Note that we don't cache the
847   children or such, as oppossed to assemble, shutdown of different
848   devices doesn't require that the upper device was active.
849
850   """
851   r_dev = _RecursiveFindBD(disk)
852   if r_dev is not None:
853     r_path = r_dev.dev_path
854     result = r_dev.Shutdown()
855     if result:
856       DevCacheManager.RemoveCache(r_path)
857   else:
858     result = True
859   if disk.children:
860     for child in disk.children:
861       result = result and ShutdownBlockDevice(child)
862   return result
863
864
865 def MirrorAddChildren(parent_cdev, new_cdevs):
866   """Extend a mirrored block device.
867
868   """
869   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
870   if parent_bdev is None:
871     logger.Error("Can't find parent device")
872     return False
873   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
874   if new_bdevs.count(None) > 0:
875     logger.Error("Can't find new device(s) to add: %s:%s" %
876                  (new_bdevs, new_cdevs))
877     return False
878   parent_bdev.AddChildren(new_bdevs)
879   return True
880
881
882 def MirrorRemoveChildren(parent_cdev, new_cdevs):
883   """Shrink a mirrored block device.
884
885   """
886   parent_bdev = _RecursiveFindBD(parent_cdev)
887   if parent_bdev is None:
888     logger.Error("Can't find parent in remove children: %s" % parent_cdev)
889     return False
890   devs = []
891   for disk in new_cdevs:
892     rpath = disk.StaticDevPath()
893     if rpath is None:
894       bd = _RecursiveFindBD(disk)
895       if bd is None:
896         logger.Error("Can't find dynamic device %s while removing children" %
897                      disk)
898         return False
899       else:
900         devs.append(bd.dev_path)
901     else:
902       devs.append(rpath)
903   parent_bdev.RemoveChildren(devs)
904   return True
905
906
907 def GetMirrorStatus(disks):
908   """Get the mirroring status of a list of devices.
909
910   Args:
911     disks: list of `objects.Disk`
912
913   Returns:
914     list of (mirror_done, estimated_time) tuples, which
915     are the result of bdev.BlockDevice.CombinedSyncStatus()
916
917   """
918   stats = []
919   for dsk in disks:
920     rbd = _RecursiveFindBD(dsk)
921     if rbd is None:
922       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
923     stats.append(rbd.CombinedSyncStatus())
924   return stats
925
926
927 def _RecursiveFindBD(disk, allow_partial=False):
928   """Check if a device is activated.
929
930   If so, return informations about the real device.
931
932   Args:
933     disk: the objects.Disk instance
934     allow_partial: don't abort the find if a child of the
935                    device can't be found; this is intended to be
936                    used when repairing mirrors
937
938   Returns:
939     None if the device can't be found
940     otherwise the device instance
941
942   """
943   children = []
944   if disk.children:
945     for chdisk in disk.children:
946       children.append(_RecursiveFindBD(chdisk))
947
948   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
949
950
951 def FindBlockDevice(disk):
952   """Check if a device is activated.
953
954   If so, return informations about the real device.
955
956   Args:
957     disk: the objects.Disk instance
958   Returns:
959     None if the device can't be found
960     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
961
962   """
963   rbd = _RecursiveFindBD(disk)
964   if rbd is None:
965     return rbd
966   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
967
968
969 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
970   """Write a file to the filesystem.
971
972   This allows the master to overwrite(!) a file. It will only perform
973   the operation if the file belongs to a list of configuration files.
974
975   """
976   if not os.path.isabs(file_name):
977     logger.Error("Filename passed to UploadFile is not absolute: '%s'" %
978                  file_name)
979     return False
980
981   allowed_files = [
982     constants.CLUSTER_CONF_FILE,
983     constants.ETC_HOSTS,
984     constants.SSH_KNOWN_HOSTS_FILE,
985     constants.VNC_PASSWORD_FILE,
986     ]
987   allowed_files.extend(ssconf.SimpleStore().GetFileList())
988   if file_name not in allowed_files:
989     logger.Error("Filename passed to UploadFile not in allowed"
990                  " upload targets: '%s'" % file_name)
991     return False
992
993   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
994                   atime=atime, mtime=mtime)
995   return True
996
997
998 def _ErrnoOrStr(err):
999   """Format an EnvironmentError exception.
1000
1001   If the `err` argument has an errno attribute, it will be looked up
1002   and converted into a textual EXXXX description. Otherwise the string
1003   representation of the error will be returned.
1004
1005   """
1006   if hasattr(err, 'errno'):
1007     detail = errno.errorcode[err.errno]
1008   else:
1009     detail = str(err)
1010   return detail
1011
1012
1013 def _OSOndiskVersion(name, os_dir):
1014   """Compute and return the API version of a given OS.
1015
1016   This function will try to read the API version of the os given by
1017   the 'name' parameter and residing in the 'os_dir' directory.
1018
1019   Return value will be either an integer denoting the version or None in the
1020   case when this is not a valid OS name.
1021
1022   """
1023   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1024
1025   try:
1026     st = os.stat(api_file)
1027   except EnvironmentError, err:
1028     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1029                            " found (%s)" % _ErrnoOrStr(err))
1030
1031   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1032     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1033                            " a regular file")
1034
1035   try:
1036     f = open(api_file)
1037     try:
1038       api_version = f.read(256)
1039     finally:
1040       f.close()
1041   except EnvironmentError, err:
1042     raise errors.InvalidOS(name, os_dir, "error while reading the"
1043                            " API version (%s)" % _ErrnoOrStr(err))
1044
1045   api_version = api_version.strip()
1046   try:
1047     api_version = int(api_version)
1048   except (TypeError, ValueError), err:
1049     raise errors.InvalidOS(name, os_dir,
1050                            "API version is not integer (%s)" % str(err))
1051
1052   return api_version
1053
1054
1055 def DiagnoseOS(top_dirs=None):
1056   """Compute the validity for all OSes.
1057
1058   Returns an OS object for each name in all the given top directories
1059   (if not given defaults to constants.OS_SEARCH_PATH)
1060
1061   Returns:
1062     list of OS objects
1063
1064   """
1065   if top_dirs is None:
1066     top_dirs = constants.OS_SEARCH_PATH
1067
1068   result = []
1069   for dir_name in top_dirs:
1070     if os.path.isdir(dir_name):
1071       try:
1072         f_names = utils.ListVisibleFiles(dir_name)
1073       except EnvironmentError, err:
1074         logger.Error("Can't list the OS directory %s: %s" %
1075                      (dir_name, str(err)))
1076         break
1077       for name in f_names:
1078         try:
1079           os_inst = OSFromDisk(name, base_dir=dir_name)
1080           result.append(os_inst)
1081         except errors.InvalidOS, err:
1082           result.append(objects.OS.FromInvalidOS(err))
1083
1084   return result
1085
1086
1087 def OSFromDisk(name, base_dir=None):
1088   """Create an OS instance from disk.
1089
1090   This function will return an OS instance if the given name is a
1091   valid OS name. Otherwise, it will raise an appropriate
1092   `errors.InvalidOS` exception, detailing why this is not a valid
1093   OS.
1094
1095   Args:
1096     os_dir: Directory containing the OS scripts. Defaults to a search
1097             in all the OS_SEARCH_PATH directories.
1098
1099   """
1100
1101   if base_dir is None:
1102     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1103     if os_dir is None:
1104       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1105   else:
1106     os_dir = os.path.sep.join([base_dir, name])
1107
1108   api_version = _OSOndiskVersion(name, os_dir)
1109
1110   if api_version != constants.OS_API_VERSION:
1111     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1112                            " (found %s want %s)"
1113                            % (api_version, constants.OS_API_VERSION))
1114
1115   # OS Scripts dictionary, we will populate it with the actual script names
1116   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1117
1118   for script in os_scripts:
1119     os_scripts[script] = os.path.sep.join([os_dir, script])
1120
1121     try:
1122       st = os.stat(os_scripts[script])
1123     except EnvironmentError, err:
1124       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1125                              (script, _ErrnoOrStr(err)))
1126
1127     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1128       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1129                              script)
1130
1131     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1132       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1133                              script)
1134
1135
1136   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1137                     create_script=os_scripts['create'],
1138                     export_script=os_scripts['export'],
1139                     import_script=os_scripts['import'],
1140                     rename_script=os_scripts['rename'],
1141                     api_version=api_version)
1142
1143
1144 def GrowBlockDevice(disk, amount):
1145   """Grow a stack of block devices.
1146
1147   This function is called recursively, with the childrens being the
1148   first one resize.
1149
1150   Args:
1151     disk: the disk to be grown
1152
1153   Returns: a tuple of (status, result), with:
1154     status: the result (true/false) of the operation
1155     result: the error message if the operation failed, otherwise not used
1156
1157   """
1158   r_dev = _RecursiveFindBD(disk)
1159   if r_dev is None:
1160     return False, "Cannot find block device %s" % (disk,)
1161
1162   try:
1163     r_dev.Grow(amount)
1164   except errors.BlockDeviceError, err:
1165     return False, str(err)
1166
1167   return True, None
1168
1169
1170 def SnapshotBlockDevice(disk):
1171   """Create a snapshot copy of a block device.
1172
1173   This function is called recursively, and the snapshot is actually created
1174   just for the leaf lvm backend device.
1175
1176   Args:
1177     disk: the disk to be snapshotted
1178
1179   Returns:
1180     a config entry for the actual lvm device snapshotted.
1181
1182   """
1183   if disk.children:
1184     if len(disk.children) == 1:
1185       # only one child, let's recurse on it
1186       return SnapshotBlockDevice(disk.children[0])
1187     else:
1188       # more than one child, choose one that matches
1189       for child in disk.children:
1190         if child.size == disk.size:
1191           # return implies breaking the loop
1192           return SnapshotBlockDevice(child)
1193   elif disk.dev_type == constants.LD_LV:
1194     r_dev = _RecursiveFindBD(disk)
1195     if r_dev is not None:
1196       # let's stay on the safe side and ask for the full size, for now
1197       return r_dev.Snapshot(disk.size)
1198     else:
1199       return None
1200   else:
1201     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1202                                  " '%s' of type '%s'" %
1203                                  (disk.unique_id, disk.dev_type))
1204
1205
1206 def ExportSnapshot(disk, dest_node, instance):
1207   """Export a block device snapshot to a remote node.
1208
1209   Args:
1210     disk: the snapshot block device
1211     dest_node: the node to send the image to
1212     instance: instance being exported
1213
1214   Returns:
1215     True if successful, False otherwise.
1216
1217   """
1218   inst_os = OSFromDisk(instance.os)
1219   export_script = inst_os.export_script
1220
1221   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1222                                      instance.name, int(time.time()))
1223   if not os.path.exists(constants.LOG_OS_DIR):
1224     os.mkdir(constants.LOG_OS_DIR, 0750)
1225
1226   real_os_dev = _RecursiveFindBD(disk)
1227   if real_os_dev is None:
1228     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1229                                   str(disk))
1230   real_os_dev.Open()
1231
1232   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1233   destfile = disk.physical_id[1]
1234
1235   # the target command is built out of three individual commands,
1236   # which are joined by pipes; we check each individual command for
1237   # valid parameters
1238
1239   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1240                                export_script, instance.name,
1241                                real_os_dev.dev_path, logfile)
1242
1243   comprcmd = "gzip"
1244
1245   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1246                                 destdir, destdir, destfile)
1247   remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1248                                        destcmd)
1249
1250   # all commands have been checked, so we're safe to combine them
1251   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1252
1253   result = utils.RunCmd(command)
1254
1255   if result.failed:
1256     logger.Error("os snapshot export command '%s' returned error: %s"
1257                  " output: %s" %
1258                  (command, result.fail_reason, result.output))
1259     return False
1260
1261   return True
1262
1263
1264 def FinalizeExport(instance, snap_disks):
1265   """Write out the export configuration information.
1266
1267   Args:
1268     instance: instance configuration
1269     snap_disks: snapshot block devices
1270
1271   Returns:
1272     False in case of error, True otherwise.
1273
1274   """
1275   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1276   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1277
1278   config = objects.SerializableConfigParser()
1279
1280   config.add_section(constants.INISECT_EXP)
1281   config.set(constants.INISECT_EXP, 'version', '0')
1282   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1283   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1284   config.set(constants.INISECT_EXP, 'os', instance.os)
1285   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1286
1287   config.add_section(constants.INISECT_INS)
1288   config.set(constants.INISECT_INS, 'name', instance.name)
1289   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1290   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1291   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1292
1293   nic_count = 0
1294   for nic_count, nic in enumerate(instance.nics):
1295     config.set(constants.INISECT_INS, 'nic%d_mac' %
1296                nic_count, '%s' % nic.mac)
1297     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1298     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
1299   # TODO: redundant: on load can read nics until it doesn't exist
1300   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1301
1302   disk_count = 0
1303   for disk_count, disk in enumerate(snap_disks):
1304     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1305                ('%s' % disk.iv_name))
1306     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1307                ('%s' % disk.physical_id[1]))
1308     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1309                ('%d' % disk.size))
1310   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1311
1312   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1313   cfo = open(cff, 'w')
1314   try:
1315     config.write(cfo)
1316   finally:
1317     cfo.close()
1318
1319   shutil.rmtree(finaldestdir, True)
1320   shutil.move(destdir, finaldestdir)
1321
1322   return True
1323
1324
1325 def ExportInfo(dest):
1326   """Get export configuration information.
1327
1328   Args:
1329     dest: directory containing the export
1330
1331   Returns:
1332     A serializable config file containing the export info.
1333
1334   """
1335   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1336
1337   config = objects.SerializableConfigParser()
1338   config.read(cff)
1339
1340   if (not config.has_section(constants.INISECT_EXP) or
1341       not config.has_section(constants.INISECT_INS)):
1342     return None
1343
1344   return config
1345
1346
1347 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1348   """Import an os image into an instance.
1349
1350   Args:
1351     instance: the instance object
1352     os_disk: the instance-visible name of the os device
1353     swap_disk: the instance-visible name of the swap device
1354     src_node: node holding the source image
1355     src_image: path to the source image on src_node
1356
1357   Returns:
1358     False in case of error, True otherwise.
1359
1360   """
1361   inst_os = OSFromDisk(instance.os)
1362   import_script = inst_os.import_script
1363
1364   os_device = instance.FindDisk(os_disk)
1365   if os_device is None:
1366     logger.Error("Can't find this device-visible name '%s'" % os_disk)
1367     return False
1368
1369   swap_device = instance.FindDisk(swap_disk)
1370   if swap_device is None:
1371     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
1372     return False
1373
1374   real_os_dev = _RecursiveFindBD(os_device)
1375   if real_os_dev is None:
1376     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1377                                   str(os_device))
1378   real_os_dev.Open()
1379
1380   real_swap_dev = _RecursiveFindBD(swap_device)
1381   if real_swap_dev is None:
1382     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1383                                   str(swap_device))
1384   real_swap_dev.Open()
1385
1386   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1387                                         instance.name, int(time.time()))
1388   if not os.path.exists(constants.LOG_OS_DIR):
1389     os.mkdir(constants.LOG_OS_DIR, 0750)
1390
1391   destcmd = utils.BuildShellCmd('cat %s', src_image)
1392   remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1393                                        destcmd)
1394
1395   comprcmd = "gunzip"
1396   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1397                                inst_os.path, import_script, instance.name,
1398                                real_os_dev.dev_path, real_swap_dev.dev_path,
1399                                logfile)
1400
1401   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1402
1403   result = utils.RunCmd(command)
1404
1405   if result.failed:
1406     logger.Error("os import command '%s' returned error: %s"
1407                  " output: %s" %
1408                  (command, result.fail_reason, result.output))
1409     return False
1410
1411   return True
1412
1413
1414 def ListExports():
1415   """Return a list of exports currently available on this machine.
1416
1417   """
1418   if os.path.isdir(constants.EXPORT_DIR):
1419     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1420   else:
1421     return []
1422
1423
1424 def RemoveExport(export):
1425   """Remove an existing export from the node.
1426
1427   Args:
1428     export: the name of the export to remove
1429
1430   Returns:
1431     False in case of error, True otherwise.
1432
1433   """
1434   target = os.path.join(constants.EXPORT_DIR, export)
1435
1436   shutil.rmtree(target)
1437   # TODO: catch some of the relevant exceptions and provide a pretty
1438   # error message if rmtree fails.
1439
1440   return True
1441
1442
1443 def RenameBlockDevices(devlist):
1444   """Rename a list of block devices.
1445
1446   The devlist argument is a list of tuples (disk, new_logical,
1447   new_physical). The return value will be a combined boolean result
1448   (True only if all renames succeeded).
1449
1450   """
1451   result = True
1452   for disk, unique_id in devlist:
1453     dev = _RecursiveFindBD(disk)
1454     if dev is None:
1455       result = False
1456       continue
1457     try:
1458       old_rpath = dev.dev_path
1459       dev.Rename(unique_id)
1460       new_rpath = dev.dev_path
1461       if old_rpath != new_rpath:
1462         DevCacheManager.RemoveCache(old_rpath)
1463         # FIXME: we should add the new cache information here, like:
1464         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1465         # but we don't have the owner here - maybe parse from existing
1466         # cache? for now, we only lose lvm data when we rename, which
1467         # is less critical than DRBD or MD
1468     except errors.BlockDeviceError, err:
1469       logger.Error("Can't rename device '%s' to '%s': %s" %
1470                    (dev, unique_id, err))
1471       result = False
1472   return result
1473
1474
1475 def _TransformFileStorageDir(file_storage_dir):
1476   """Checks whether given file_storage_dir is valid.
1477
1478   Checks wheter the given file_storage_dir is within the cluster-wide
1479   default file_storage_dir stored in SimpleStore. Only paths under that
1480   directory are allowed.
1481
1482   Args:
1483     file_storage_dir: string with path
1484
1485   Returns:
1486     normalized file_storage_dir (string) if valid, None otherwise
1487
1488   """
1489   file_storage_dir = os.path.normpath(file_storage_dir)
1490   base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1491   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1492       base_file_storage_dir):
1493     logger.Error("file storage directory '%s' is not under base file"
1494                  " storage directory '%s'" %
1495                  (file_storage_dir, base_file_storage_dir))
1496     return None
1497   return file_storage_dir
1498
1499
1500 def CreateFileStorageDir(file_storage_dir):
1501   """Create file storage directory.
1502
1503   Args:
1504     file_storage_dir: string containing the path
1505
1506   Returns:
1507     tuple with first element a boolean indicating wheter dir
1508     creation was successful or not
1509
1510   """
1511   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1512   result = True,
1513   if not file_storage_dir:
1514     result = False,
1515   else:
1516     if os.path.exists(file_storage_dir):
1517       if not os.path.isdir(file_storage_dir):
1518         logger.Error("'%s' is not a directory" % file_storage_dir)
1519         result = False,
1520     else:
1521       try:
1522         os.makedirs(file_storage_dir, 0750)
1523       except OSError, err:
1524         logger.Error("Cannot create file storage directory '%s': %s" %
1525                      (file_storage_dir, err))
1526         result = False,
1527   return result
1528
1529
1530 def RemoveFileStorageDir(file_storage_dir):
1531   """Remove file storage directory.
1532
1533   Remove it only if it's empty. If not log an error and return.
1534
1535   Args:
1536     file_storage_dir: string containing the path
1537
1538   Returns:
1539     tuple with first element a boolean indicating wheter dir
1540     removal was successful or not
1541
1542   """
1543   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1544   result = True,
1545   if not file_storage_dir:
1546     result = False,
1547   else:
1548     if os.path.exists(file_storage_dir):
1549       if not os.path.isdir(file_storage_dir):
1550         logger.Error("'%s' is not a directory" % file_storage_dir)
1551         result = False,
1552       # deletes dir only if empty, otherwise we want to return False
1553       try:
1554         os.rmdir(file_storage_dir)
1555       except OSError, err:
1556         logger.Error("Cannot remove file storage directory '%s': %s" %
1557                      (file_storage_dir, err))
1558         result = False,
1559   return result
1560
1561
1562 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1563   """Rename the file storage directory.
1564
1565   Args:
1566     old_file_storage_dir: string containing the old path
1567     new_file_storage_dir: string containing the new path
1568
1569   Returns:
1570     tuple with first element a boolean indicating wheter dir
1571     rename was successful or not
1572
1573   """
1574   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1575   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1576   result = True,
1577   if not old_file_storage_dir or not new_file_storage_dir:
1578     result = False,
1579   else:
1580     if not os.path.exists(new_file_storage_dir):
1581       if os.path.isdir(old_file_storage_dir):
1582         try:
1583           os.rename(old_file_storage_dir, new_file_storage_dir)
1584         except OSError, err:
1585           logger.Error("Cannot rename '%s' to '%s': %s"
1586                        % (old_file_storage_dir, new_file_storage_dir, err))
1587           result =  False,
1588       else:
1589         logger.Error("'%s' is not a directory" % old_file_storage_dir)
1590         result = False,
1591     else:
1592       if os.path.exists(old_file_storage_dir):
1593         logger.Error("Cannot rename '%s' to '%s'. Both locations exist." %
1594                      old_file_storage_dir, new_file_storage_dir)
1595         result = False,
1596   return result
1597
1598
1599 def CloseBlockDevices(disks):
1600   """Closes the given block devices.
1601
1602   This means they will be switched to secondary mode (in case of DRBD).
1603
1604   """
1605   bdevs = []
1606   for cf in disks:
1607     rd = _RecursiveFindBD(cf)
1608     if rd is None:
1609       return (False, "Can't find device %s" % cf)
1610     bdevs.append(rd)
1611
1612   msg = []
1613   for rd in bdevs:
1614     try:
1615       rd.Close()
1616     except errors.BlockDeviceError, err:
1617       msg.append(str(err))
1618   if msg:
1619     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1620   else:
1621     return (True, "All devices secondary")
1622
1623
1624 class HooksRunner(object):
1625   """Hook runner.
1626
1627   This class is instantiated on the node side (ganeti-noded) and not on
1628   the master side.
1629
1630   """
1631   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1632
1633   def __init__(self, hooks_base_dir=None):
1634     """Constructor for hooks runner.
1635
1636     Args:
1637       - hooks_base_dir: if not None, this overrides the
1638         constants.HOOKS_BASE_DIR (useful for unittests)
1639
1640     """
1641     if hooks_base_dir is None:
1642       hooks_base_dir = constants.HOOKS_BASE_DIR
1643     self._BASE_DIR = hooks_base_dir
1644
1645   @staticmethod
1646   def ExecHook(script, env):
1647     """Exec one hook script.
1648
1649     Args:
1650      - script: the full path to the script
1651      - env: the environment with which to exec the script
1652
1653     """
1654     # exec the process using subprocess and log the output
1655     fdstdin = None
1656     try:
1657       fdstdin = open("/dev/null", "r")
1658       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1659                                stderr=subprocess.STDOUT, close_fds=True,
1660                                shell=False, cwd="/", env=env)
1661       output = ""
1662       try:
1663         output = child.stdout.read(4096)
1664         child.stdout.close()
1665       except EnvironmentError, err:
1666         output += "Hook script error: %s" % str(err)
1667
1668       while True:
1669         try:
1670           result = child.wait()
1671           break
1672         except EnvironmentError, err:
1673           if err.errno == errno.EINTR:
1674             continue
1675           raise
1676     finally:
1677       # try not to leak fds
1678       for fd in (fdstdin, ):
1679         if fd is not None:
1680           try:
1681             fd.close()
1682           except EnvironmentError, err:
1683             # just log the error
1684             #logger.Error("While closing fd %s: %s" % (fd, err))
1685             pass
1686
1687     return result == 0, output
1688
1689   def RunHooks(self, hpath, phase, env):
1690     """Run the scripts in the hooks directory.
1691
1692     This method will not be usually overriden by child opcodes.
1693
1694     """
1695     if phase == constants.HOOKS_PHASE_PRE:
1696       suffix = "pre"
1697     elif phase == constants.HOOKS_PHASE_POST:
1698       suffix = "post"
1699     else:
1700       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1701     rr = []
1702
1703     subdir = "%s-%s.d" % (hpath, suffix)
1704     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1705     try:
1706       dir_contents = utils.ListVisibleFiles(dir_name)
1707     except OSError, err:
1708       # must log
1709       return rr
1710
1711     # we use the standard python sort order,
1712     # so 00name is the recommended naming scheme
1713     dir_contents.sort()
1714     for relname in dir_contents:
1715       fname = os.path.join(dir_name, relname)
1716       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1717           self.RE_MASK.match(relname) is not None):
1718         rrval = constants.HKR_SKIP
1719         output = ""
1720       else:
1721         result, output = self.ExecHook(fname, env)
1722         if not result:
1723           rrval = constants.HKR_FAIL
1724         else:
1725           rrval = constants.HKR_SUCCESS
1726       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1727
1728     return rr
1729
1730
1731 class IAllocatorRunner(object):
1732   """IAllocator runner.
1733
1734   This class is instantiated on the node side (ganeti-noded) and not on
1735   the master side.
1736
1737   """
1738   def Run(self, name, idata):
1739     """Run an iallocator script.
1740
1741     Return value: tuple of:
1742        - run status (one of the IARUN_ constants)
1743        - stdout
1744        - stderr
1745        - fail reason (as from utils.RunResult)
1746
1747     """
1748     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1749                                   os.path.isfile)
1750     if alloc_script is None:
1751       return (constants.IARUN_NOTFOUND, None, None, None)
1752
1753     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1754     try:
1755       os.write(fd, idata)
1756       os.close(fd)
1757       result = utils.RunCmd([alloc_script, fin_name])
1758       if result.failed:
1759         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1760                 result.fail_reason)
1761     finally:
1762       os.unlink(fin_name)
1763
1764     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1765
1766
1767 class DevCacheManager(object):
1768   """Simple class for managing a cache of block device information.
1769
1770   """
1771   _DEV_PREFIX = "/dev/"
1772   _ROOT_DIR = constants.BDEV_CACHE_DIR
1773
1774   @classmethod
1775   def _ConvertPath(cls, dev_path):
1776     """Converts a /dev/name path to the cache file name.
1777
1778     This replaces slashes with underscores and strips the /dev
1779     prefix. It then returns the full path to the cache file
1780
1781     """
1782     if dev_path.startswith(cls._DEV_PREFIX):
1783       dev_path = dev_path[len(cls._DEV_PREFIX):]
1784     dev_path = dev_path.replace("/", "_")
1785     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1786     return fpath
1787
1788   @classmethod
1789   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1790     """Updates the cache information for a given device.
1791
1792     """
1793     if dev_path is None:
1794       logger.Error("DevCacheManager.UpdateCache got a None dev_path")
1795       return
1796     fpath = cls._ConvertPath(dev_path)
1797     if on_primary:
1798       state = "primary"
1799     else:
1800       state = "secondary"
1801     if iv_name is None:
1802       iv_name = "not_visible"
1803     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1804     try:
1805       utils.WriteFile(fpath, data=fdata)
1806     except EnvironmentError, err:
1807       logger.Error("Can't update bdev cache for %s, error %s" %
1808                    (dev_path, str(err)))
1809
1810   @classmethod
1811   def RemoveCache(cls, dev_path):
1812     """Remove data for a dev_path.
1813
1814     """
1815     if dev_path is None:
1816       logger.Error("DevCacheManager.RemoveCache got a None dev_path")
1817       return
1818     fpath = cls._ConvertPath(dev_path)
1819     try:
1820       utils.RemoveFile(fpath)
1821     except EnvironmentError, err:
1822       logger.Error("Can't update bdev cache for %s, error %s" %
1823                    (dev_path, str(err)))