OS API: support for multiple versions in an OS
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import tempfile
30 import stat
31 import errno
32 import re
33 import subprocess
34 import random
35
36 from ganeti import logger
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 # global cache for DrbdReconfigNet
48
49 # the cache is indexed by a tuple (instance_name, physical_id) of the
50 # disk, and contains as value the actual BlockDev instance (in our
51 # particular usage, only DRBD8 instances)
52
53 # the cache is cleared every time we restart DrbdReconfigNet with
54 # step=1, and it's checked for consistency (we have the same keys and
55 # size of the cache) when it's called with step!=1
56
57 _D8_RECONF_CACHE = {}
58
59 def StartMaster():
60   """Activate local node as master node.
61
62   There are two needed steps for this:
63     - run the master script
64     - register the cron script
65
66   """
67   result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
68
69   if result.failed:
70     logger.Error("could not activate cluster interface with command %s,"
71                  " error: '%s'" % (result.cmd, result.output))
72     return False
73
74   return True
75
76
77 def StopMaster():
78   """Deactivate this node as master.
79
80   This runs the master stop script.
81
82   """
83   result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
84
85   if result.failed:
86     logger.Error("could not deactivate cluster interface with command %s,"
87                  " error: '%s'" % (result.cmd, result.output))
88     return False
89
90   return True
91
92
93 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
94   """Joins this node to the cluster.
95
96   This does the following:
97       - updates the hostkeys of the machine (rsa and dsa)
98       - adds the ssh private key to the user
99       - adds the ssh public key to the users' authorized_keys file
100
101   """
102   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
103                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
104                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
105                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
106   for name, content, mode in sshd_keys:
107     utils.WriteFile(name, data=content, mode=mode)
108
109   try:
110     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
111                                                     mkdir=True)
112   except errors.OpExecError, err:
113     logger.Error("Error while processing user ssh files: %s" % err)
114     return False
115
116   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
117     utils.WriteFile(name, data=content, mode=0600)
118
119   utils.AddAuthorizedKey(auth_keys, sshpub)
120
121   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
122
123   return True
124
125
126 def LeaveCluster():
127   """Cleans up the current node and prepares it to be removed from the cluster.
128
129   """
130   if os.path.isdir(constants.DATA_DIR):
131     for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
132       full_name = os.path.join(constants.DATA_DIR, rel_name)
133       if os.path.isfile(full_name) and not os.path.islink(full_name):
134         utils.RemoveFile(full_name)
135
136   try:
137     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
138   except errors.OpExecError, err:
139     logger.Error("Error while processing ssh files: %s" % err)
140     return
141
142   f = open(pub_key, 'r')
143   try:
144     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
145   finally:
146     f.close()
147
148   utils.RemoveFile(priv_key)
149   utils.RemoveFile(pub_key)
150
151
152 def GetNodeInfo(vgname):
153   """Gives back a hash with different informations about the node.
154
155   Returns:
156     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
157       'memory_free' : xxx, 'memory_total' : xxx }
158     where
159     vg_size is the size of the configured volume group in MiB
160     vg_free is the free size of the volume group in MiB
161     memory_dom0 is the memory allocated for domain0 in MiB
162     memory_free is the currently available (free) ram in MiB
163     memory_total is the total number of ram in MiB
164
165   """
166   outputarray = {}
167   vginfo = _GetVGInfo(vgname)
168   outputarray['vg_size'] = vginfo['vg_size']
169   outputarray['vg_free'] = vginfo['vg_free']
170
171   hyper = hypervisor.GetHypervisor()
172   hyp_info = hyper.GetNodeInfo()
173   if hyp_info is not None:
174     outputarray.update(hyp_info)
175
176   f = open("/proc/sys/kernel/random/boot_id", 'r')
177   try:
178     outputarray["bootid"] = f.read(128).rstrip("\n")
179   finally:
180     f.close()
181
182   return outputarray
183
184
185 def VerifyNode(what):
186   """Verify the status of the local node.
187
188   Args:
189     what - a dictionary of things to check:
190       'filelist' : list of files for which to compute checksums
191       'nodelist' : list of nodes we should check communication with
192       'hypervisor': run the hypervisor-specific verify
193
194   Requested files on local node are checksummed and the result returned.
195
196   The nodelist is traversed, with the following checks being made
197   for each node:
198   - known_hosts key correct
199   - correct resolving of node name (target node returns its own hostname
200     by ssh-execution of 'hostname', result compared against name in list.
201
202   """
203   result = {}
204
205   if 'hypervisor' in what:
206     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
207
208   if 'filelist' in what:
209     result['filelist'] = utils.FingerprintFiles(what['filelist'])
210
211   if 'nodelist' in what:
212     result['nodelist'] = {}
213     random.shuffle(what['nodelist'])
214     for node in what['nodelist']:
215       success, message = ssh.VerifyNodeHostname(node)
216       if not success:
217         result['nodelist'][node] = message
218   if 'node-net-test' in what:
219     result['node-net-test'] = {}
220     my_name = utils.HostInfo().name
221     my_pip = my_sip = None
222     for name, pip, sip in what['node-net-test']:
223       if name == my_name:
224         my_pip = pip
225         my_sip = sip
226         break
227     if not my_pip:
228       result['node-net-test'][my_name] = ("Can't find my own"
229                                           " primary/secondary IP"
230                                           " in the node list")
231     else:
232       port = ssconf.SimpleStore().GetNodeDaemonPort()
233       for name, pip, sip in what['node-net-test']:
234         fail = []
235         if not utils.TcpPing(pip, port, source=my_pip):
236           fail.append("primary")
237         if sip != pip:
238           if not utils.TcpPing(sip, port, source=my_sip):
239             fail.append("secondary")
240         if fail:
241           result['node-net-test'][name] = ("failure using the %s"
242                                            " interface(s)" %
243                                            " and ".join(fail))
244
245   return result
246
247
248 def GetVolumeList(vg_name):
249   """Compute list of logical volumes and their size.
250
251   Returns:
252     dictionary of all partions (key) with their size (in MiB), inactive
253     and online status:
254     {'test1': ('20.06', True, True)}
255
256   """
257   lvs = {}
258   sep = '|'
259   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
260                          "--separator=%s" % sep,
261                          "-olv_name,lv_size,lv_attr", vg_name])
262   if result.failed:
263     logger.Error("Failed to list logical volumes, lvs output: %s" %
264                  result.output)
265     return result.output
266
267   for line in result.stdout.splitlines():
268     line = line.strip().rstrip(sep)
269     if line.count(sep) != 2:
270       logger.Error("Invalid line returned from lvs output: '%s'" % line)
271       continue
272     name, size, attr = line.split(sep)
273     if len(attr) != 6:
274       attr = '------'
275     inactive = attr[4] == '-'
276     online = attr[5] == 'o'
277     lvs[name] = (size, inactive, online)
278
279   return lvs
280
281
282 def ListVolumeGroups():
283   """List the volume groups and their size.
284
285   Returns:
286     Dictionary with keys volume name and values the size of the volume
287
288   """
289   return utils.ListVolumeGroups()
290
291
292 def NodeVolumes():
293   """List all volumes on this node.
294
295   """
296   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
297                          "--separator=|",
298                          "--options=lv_name,lv_size,devices,vg_name"])
299   if result.failed:
300     logger.Error("Failed to list logical volumes, lvs output: %s" %
301                  result.output)
302     return {}
303
304   def parse_dev(dev):
305     if '(' in dev:
306       return dev.split('(')[0]
307     else:
308       return dev
309
310   def map_line(line):
311     return {
312       'name': line[0].strip(),
313       'size': line[1].strip(),
314       'dev': parse_dev(line[2].strip()),
315       'vg': line[3].strip(),
316     }
317
318   return [map_line(line.split('|')) for line in result.stdout.splitlines()
319           if line.count('|') >= 3]
320
321
322 def BridgesExist(bridges_list):
323   """Check if a list of bridges exist on the current node.
324
325   Returns:
326     True if all of them exist, false otherwise
327
328   """
329   for bridge in bridges_list:
330     if not utils.BridgeExists(bridge):
331       return False
332
333   return True
334
335
336 def GetInstanceList():
337   """Provides a list of instances.
338
339   Returns:
340     A list of all running instances on the current node
341     - instance1.example.com
342     - instance2.example.com
343
344   """
345   try:
346     names = hypervisor.GetHypervisor().ListInstances()
347   except errors.HypervisorError, err:
348     logger.Error("error enumerating instances: %s" % str(err))
349     raise
350
351   return names
352
353
354 def GetInstanceInfo(instance):
355   """Gives back the informations about an instance as a dictionary.
356
357   Args:
358     instance: name of the instance (ex. instance1.example.com)
359
360   Returns:
361     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
362     where
363     memory: memory size of instance (int)
364     state: xen state of instance (string)
365     time: cpu time of instance (float)
366
367   """
368   output = {}
369
370   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
371   if iinfo is not None:
372     output['memory'] = iinfo[2]
373     output['state'] = iinfo[4]
374     output['time'] = iinfo[5]
375
376   return output
377
378
379 def GetInstanceMigratable(instance):
380   """Gives whether an instance can be migrated.
381
382   Args:
383     instance - object representing the instance to be checked.
384
385   Returns:
386     (result, description)
387       result - whether the instance can be migrated or not
388       description - a description of the issue, if relevant
389
390   """
391   hyper = hypervisor.GetHypervisor()
392   if instance.name not in hyper.ListInstances():
393     return (False, 'not running')
394
395   for disk in instance.disks:
396     link_name = _GetBlockDevSymlinkPath(instance.name, disk.iv_name)
397     if not os.path.islink(link_name):
398       return (False, 'not restarted since ganeti 1.2.5')
399
400   return (True, '')
401
402
403 def GetAllInstancesInfo():
404   """Gather data about all instances.
405
406   This is the equivalent of `GetInstanceInfo()`, except that it
407   computes data for all instances at once, thus being faster if one
408   needs data about more than one instance.
409
410   Returns: a dictionary of dictionaries, keys being the instance name,
411     and with values:
412     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
413     where
414     memory: memory size of instance (int)
415     state: xen state of instance (string)
416     time: cpu time of instance (float)
417     vcpus: the number of cpus
418
419   """
420   output = {}
421
422   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
423   if iinfo:
424     for name, inst_id, memory, vcpus, state, times in iinfo:
425       output[name] = {
426         'memory': memory,
427         'vcpus': vcpus,
428         'state': state,
429         'time': times,
430         }
431
432   return output
433
434
435 def AddOSToInstance(instance, os_disk, swap_disk):
436   """Add an OS to an instance.
437
438   Args:
439     instance: the instance object
440     os_disk: the instance-visible name of the os device
441     swap_disk: the instance-visible name of the swap device
442
443   """
444   inst_os = OSFromDisk(instance.os)
445
446   create_script = inst_os.create_script
447
448   os_device = instance.FindDisk(os_disk)
449   if os_device is None:
450     logger.Error("Can't find this device-visible name '%s'" % os_disk)
451     return False
452
453   swap_device = instance.FindDisk(swap_disk)
454   if swap_device is None:
455     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
456     return False
457
458   real_os_dev = _RecursiveFindBD(os_device)
459   if real_os_dev is None:
460     raise errors.BlockDeviceError("Block device '%s' is not set up" %
461                                   str(os_device))
462   real_os_dev.Open()
463
464   real_swap_dev = _RecursiveFindBD(swap_device)
465   if real_swap_dev is None:
466     raise errors.BlockDeviceError("Block device '%s' is not set up" %
467                                   str(swap_device))
468   real_swap_dev.Open()
469
470   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
471                                      instance.name, int(time.time()))
472   if not os.path.exists(constants.LOG_OS_DIR):
473     os.mkdir(constants.LOG_OS_DIR, 0750)
474
475   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
476                                 inst_os.path, create_script, instance.name,
477                                 real_os_dev.dev_path, real_swap_dev.dev_path,
478                                 logfile)
479
480   result = utils.RunCmd(command)
481   if result.failed:
482     logger.Error("os create command '%s' returned error: %s, logfile: %s,"
483                  " output: %s" %
484                  (command, result.fail_reason, logfile, result.output))
485     return False
486
487   return True
488
489
490 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
491   """Run the OS rename script for an instance.
492
493   Args:
494     instance: the instance object
495     old_name: the old name of the instance
496     os_disk: the instance-visible name of the os device
497     swap_disk: the instance-visible name of the swap device
498
499   """
500   inst_os = OSFromDisk(instance.os)
501
502   script = inst_os.rename_script
503
504   os_device = instance.FindDisk(os_disk)
505   if os_device is None:
506     logger.Error("Can't find this device-visible name '%s'" % os_disk)
507     return False
508
509   swap_device = instance.FindDisk(swap_disk)
510   if swap_device is None:
511     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
512     return False
513
514   real_os_dev = _RecursiveFindBD(os_device)
515   if real_os_dev is None:
516     raise errors.BlockDeviceError("Block device '%s' is not set up" %
517                                   str(os_device))
518   real_os_dev.Open()
519
520   real_swap_dev = _RecursiveFindBD(swap_device)
521   if real_swap_dev is None:
522     raise errors.BlockDeviceError("Block device '%s' is not set up" %
523                                   str(swap_device))
524   real_swap_dev.Open()
525
526   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
527                                            old_name,
528                                            instance.name, int(time.time()))
529   if not os.path.exists(constants.LOG_OS_DIR):
530     os.mkdir(constants.LOG_OS_DIR, 0750)
531
532   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
533                                 inst_os.path, script, old_name, instance.name,
534                                 real_os_dev.dev_path, real_swap_dev.dev_path,
535                                 logfile)
536
537   result = utils.RunCmd(command)
538
539   if result.failed:
540     logger.Error("os create command '%s' returned error: %s"
541                  " output: %s" %
542                  (command, result.fail_reason, result.output))
543     return False
544
545   return True
546
547
548 def _GetVGInfo(vg_name):
549   """Get informations about the volume group.
550
551   Args:
552     vg_name: the volume group
553
554   Returns:
555     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
556     where
557     vg_size is the total size of the volume group in MiB
558     vg_free is the free size of the volume group in MiB
559     pv_count are the number of physical disks in that vg
560
561   If an error occurs during gathering of data, we return the same dict
562   with keys all set to None.
563
564   """
565   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
566
567   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
568                          "--nosuffix", "--units=m", "--separator=:", vg_name])
569
570   if retval.failed:
571     errmsg = "volume group %s not present" % vg_name
572     logger.Error(errmsg)
573     return retdic
574   valarr = retval.stdout.strip().rstrip(':').split(':')
575   if len(valarr) == 3:
576     try:
577       retdic = {
578         "vg_size": int(round(float(valarr[0]), 0)),
579         "vg_free": int(round(float(valarr[1]), 0)),
580         "pv_count": int(valarr[2]),
581         }
582     except ValueError, err:
583       logger.Error("Fail to parse vgs output: %s" % str(err))
584   else:
585     logger.Error("vgs output has the wrong number of fields (expected"
586                  " three): %s" % str(valarr))
587   return retdic
588
589
590 def _GetBlockDevSymlinkPath(instance_name, device_name):
591   return os.path.join(constants.DISK_LINKS_DIR,
592                       "%s:%s" % (instance_name, device_name))
593
594
595 def _SymlinkBlockDev(instance_name, device_path, device_name):
596   """Set up symlinks to a instance's block device.
597
598   This is an auxiliary function run when an instance is start (on the primary
599   node) or when an instance is migrated (on the target node).
600
601   Args:
602     instance_name: the name of the target instance
603     device_path: path of the physical block device, on the node
604     device_name: 'virtual' name of the device
605
606   Returns:
607     absolute path to the disk's symlink
608
609   """
610   link_name = _GetBlockDevSymlinkPath(instance_name, device_name)
611   try:
612     os.symlink(device_path, link_name)
613   except OSError, err:
614     if err.errno == errno.EEXIST:
615       if (not os.path.islink(link_name) or
616           os.readlink(link_name) != device_path):
617         os.remove(link_name)
618         os.symlink(device_path, link_name)
619     else:
620       raise
621
622   return link_name
623
624
625 def _RemoveBlockDevLinks(instance_name, disks):
626   """Remove the block device symlinks belonging to the given instance.
627
628   """
629   for disk in disks:
630     link_name = _GetBlockDevSymlinkPath(instance_name, disk.iv_name)
631     if os.path.islink(link_name):
632       try:
633         os.remove(link_name)
634       except OSError, err:
635         logger.Error("Can't remove symlink '%s': %s" % (link_name, err))
636
637
638 def _GatherAndLinkBlockDevs(instance):
639   """Set up an instance's block device(s).
640
641   This is run on the primary node at instance startup. The block
642   devices must be already assembled.
643
644   Returns:
645     A list of (block_device, disk_name) tuples.
646
647   """
648   block_devices = []
649   for disk in instance.disks:
650     device = _RecursiveFindBD(disk)
651     if device is None:
652       raise errors.BlockDeviceError("Block device '%s' is not set up." %
653                                     str(disk))
654     device.Open()
655     try:
656       link_name = _SymlinkBlockDev(instance.name, device.dev_path,
657                                    disk.iv_name)
658     except OSError, e:
659       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
660                                     e.strerror)
661
662     block_devices.append((link_name, disk.iv_name))
663
664   return block_devices
665
666
667 def StartInstance(instance, extra_args):
668   """Start an instance.
669
670   Args:
671     instance - object representing the instance to be started.
672
673   """
674   running_instances = GetInstanceList()
675
676   if instance.name in running_instances:
677     return True
678
679   try:
680     block_devices = _GatherAndLinkBlockDevs(instance)
681     hyper = hypervisor.GetHypervisor()
682     hyper.StartInstance(instance, block_devices, extra_args)
683   except errors.BlockDeviceError, err:
684     logger.Error("Failed to start instance: %s" % err)
685     return False
686   except errors.HypervisorError, err:
687     logger.Error("Failed to start instance: %s" % err)
688     _RemoveBlockDevLinks(instance.name, instance.disks)
689     return False
690
691   return True
692
693
694 def ShutdownInstance(instance):
695   """Shut an instance down.
696
697   Args:
698     instance - object representing the instance to be shutdown.
699
700   """
701   running_instances = GetInstanceList()
702
703   if instance.name not in running_instances:
704     return True
705
706   hyper = hypervisor.GetHypervisor()
707   try:
708     hyper.StopInstance(instance)
709   except errors.HypervisorError, err:
710     logger.Error("Failed to stop instance: %s" % err)
711     return False
712
713   # test every 10secs for 2min
714   shutdown_ok = False
715
716   time.sleep(1)
717   for dummy in range(11):
718     if instance.name not in GetInstanceList():
719       break
720     time.sleep(10)
721   else:
722     # the shutdown did not succeed
723     logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance)
724
725     try:
726       hyper.StopInstance(instance, force=True)
727     except errors.HypervisorError, err:
728       logger.Error("Failed to stop instance: %s" % err)
729       return False
730
731     time.sleep(1)
732     if instance.name in GetInstanceList():
733       logger.Error("could not shutdown instance '%s' even by destroy")
734       return False
735
736   _RemoveBlockDevLinks(instance.name, instance.disks)
737
738   return True
739
740
741 def RebootInstance(instance, reboot_type, extra_args):
742   """Reboot an instance.
743
744   Args:
745     instance    - object representing the instance to be reboot.
746     reboot_type - how to reboot [soft,hard,full]
747
748   """
749   running_instances = GetInstanceList()
750
751   if instance.name not in running_instances:
752     logger.Error("Cannot reboot instance that is not running")
753     return False
754
755   hyper = hypervisor.GetHypervisor()
756   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
757     try:
758       hyper.RebootInstance(instance)
759     except errors.HypervisorError, err:
760       logger.Error("Failed to soft reboot instance: %s" % err)
761       return False
762   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
763     try:
764       ShutdownInstance(instance)
765       StartInstance(instance, extra_args)
766     except errors.HypervisorError, err:
767       logger.Error("Failed to hard reboot instance: %s" % err)
768       return False
769   else:
770     raise errors.ParameterError("reboot_type invalid")
771
772
773   return True
774
775
776 def MigrateInstance(instance, target, live):
777   """Migrates an instance to another node.
778
779   Args:
780     instance - name of the instance to be migrated.
781     target   - node to send the instance to.
782     live     - whether to perform a live migration.
783
784   """
785   hyper = hypervisor.GetHypervisor()
786
787   try:
788     hyper.MigrateInstance(instance, target, live)
789   except errors.HypervisorError, err:
790     msg = str(err)
791     logger.Error(msg)
792     return (False, msg)
793   return (True, "Migration successfull")
794
795
796 def CreateBlockDevice(disk, size, owner, on_primary, info):
797   """Creates a block device for an instance.
798
799   Args:
800    disk: a ganeti.objects.Disk object
801    size: the size of the physical underlying device
802    owner: a string with the name of the instance
803    on_primary: a boolean indicating if it is the primary node or not
804    info: string that will be sent to the physical device creation
805
806   Returns:
807     the new unique_id of the device (this can sometime be
808     computed only after creation), or None. On secondary nodes,
809     it's not required to return anything.
810
811   """
812   clist = []
813   if disk.children:
814     for child in disk.children:
815       crdev = _RecursiveAssembleBD(child, owner, on_primary)
816       if on_primary or disk.AssembleOnSecondary():
817         # we need the children open in case the device itself has to
818         # be assembled
819         crdev.Open()
820       clist.append(crdev)
821   try:
822     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
823     if device is not None:
824       logger.Info("removing existing device %s" % disk)
825       device.Remove()
826   except errors.BlockDeviceError, err:
827     pass
828
829   device = bdev.Create(disk.dev_type, disk.physical_id,
830                        clist, size)
831   if device is None:
832     raise ValueError("Can't create child device for %s, %s" %
833                      (disk, size))
834   if on_primary or disk.AssembleOnSecondary():
835     if not device.Assemble():
836       errorstring = "Can't assemble device after creation"
837       logger.Error(errorstring)
838       raise errors.BlockDeviceError("%s, very unusual event - check the node"
839                                     " daemon logs" % errorstring)
840     device.SetSyncSpeed(constants.SYNC_SPEED)
841     if on_primary or disk.OpenOnSecondary():
842       device.Open(force=True)
843     DevCacheManager.UpdateCache(device.dev_path, owner,
844                                 on_primary, disk.iv_name)
845
846   device.SetInfo(info)
847
848   physical_id = device.unique_id
849   return physical_id
850
851
852 def RemoveBlockDevice(disk):
853   """Remove a block device.
854
855   This is intended to be called recursively.
856
857   """
858   try:
859     # since we are removing the device, allow a partial match
860     # this allows removal of broken mirrors
861     rdev = _RecursiveFindBD(disk, allow_partial=True)
862   except errors.BlockDeviceError, err:
863     # probably can't attach
864     logger.Info("Can't attach to device %s in remove" % disk)
865     rdev = None
866   if rdev is not None:
867     r_path = rdev.dev_path
868     result = rdev.Remove()
869     if result:
870       DevCacheManager.RemoveCache(r_path)
871   else:
872     result = True
873   if disk.children:
874     for child in disk.children:
875       result = result and RemoveBlockDevice(child)
876   return result
877
878
879 def _RecursiveAssembleBD(disk, owner, as_primary):
880   """Activate a block device for an instance.
881
882   This is run on the primary and secondary nodes for an instance.
883
884   This function is called recursively.
885
886   Args:
887     disk: a objects.Disk object
888     as_primary: if we should make the block device read/write
889
890   Returns:
891     the assembled device or None (in case no device was assembled)
892
893   If the assembly is not successful, an exception is raised.
894
895   """
896   children = []
897   if disk.children:
898     mcn = disk.ChildrenNeeded()
899     if mcn == -1:
900       mcn = 0 # max number of Nones allowed
901     else:
902       mcn = len(disk.children) - mcn # max number of Nones
903     for chld_disk in disk.children:
904       try:
905         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
906       except errors.BlockDeviceError, err:
907         if children.count(None) >= mcn:
908           raise
909         cdev = None
910         logger.Debug("Error in child activation: %s" % str(err))
911       children.append(cdev)
912
913   if as_primary or disk.AssembleOnSecondary():
914     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
915     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
916     result = r_dev
917     if as_primary or disk.OpenOnSecondary():
918       r_dev.Open()
919     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
920                                 as_primary, disk.iv_name)
921
922   else:
923     result = True
924   return result
925
926
927 def AssembleBlockDevice(disk, owner, as_primary):
928   """Activate a block device for an instance.
929
930   This is a wrapper over _RecursiveAssembleBD.
931
932   Returns:
933     a /dev path for primary nodes
934     True for secondary nodes
935
936   """
937   result = _RecursiveAssembleBD(disk, owner, as_primary)
938   if isinstance(result, bdev.BlockDev):
939     result = result.dev_path
940   return result
941
942
943 def ShutdownBlockDevice(disk):
944   """Shut down a block device.
945
946   First, if the device is assembled (can `Attach()`), then the device
947   is shutdown. Then the children of the device are shutdown.
948
949   This function is called recursively. Note that we don't cache the
950   children or such, as oppossed to assemble, shutdown of different
951   devices doesn't require that the upper device was active.
952
953   """
954   r_dev = _RecursiveFindBD(disk)
955   if r_dev is not None:
956     r_path = r_dev.dev_path
957     result = r_dev.Shutdown()
958     if result:
959       DevCacheManager.RemoveCache(r_path)
960   else:
961     result = True
962   if disk.children:
963     for child in disk.children:
964       result = result and ShutdownBlockDevice(child)
965   return result
966
967
968 def MirrorAddChildren(parent_cdev, new_cdevs):
969   """Extend a mirrored block device.
970
971   """
972   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
973   if parent_bdev is None:
974     logger.Error("Can't find parent device")
975     return False
976   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
977   if new_bdevs.count(None) > 0:
978     logger.Error("Can't find new device(s) to add: %s:%s" %
979                  (new_bdevs, new_cdevs))
980     return False
981   parent_bdev.AddChildren(new_bdevs)
982   return True
983
984
985 def MirrorRemoveChildren(parent_cdev, new_cdevs):
986   """Shrink a mirrored block device.
987
988   """
989   parent_bdev = _RecursiveFindBD(parent_cdev)
990   if parent_bdev is None:
991     logger.Error("Can't find parent in remove children: %s" % parent_cdev)
992     return False
993   devs = []
994   for disk in new_cdevs:
995     rpath = disk.StaticDevPath()
996     if rpath is None:
997       bd = _RecursiveFindBD(disk)
998       if bd is None:
999         logger.Error("Can't find dynamic device %s while removing children" %
1000                      disk)
1001         return False
1002       else:
1003         devs.append(bd.dev_path)
1004     else:
1005       devs.append(rpath)
1006   parent_bdev.RemoveChildren(devs)
1007   return True
1008
1009
1010 def GetMirrorStatus(disks):
1011   """Get the mirroring status of a list of devices.
1012
1013   Args:
1014     disks: list of `objects.Disk`
1015
1016   Returns:
1017     list of (mirror_done, estimated_time) tuples, which
1018     are the result of bdev.BlockDevice.CombinedSyncStatus()
1019
1020   """
1021   stats = []
1022   for dsk in disks:
1023     rbd = _RecursiveFindBD(dsk)
1024     if rbd is None:
1025       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1026     stats.append(rbd.CombinedSyncStatus())
1027   return stats
1028
1029
1030 def _RecursiveFindBD(disk, allow_partial=False):
1031   """Check if a device is activated.
1032
1033   If so, return informations about the real device.
1034
1035   Args:
1036     disk: the objects.Disk instance
1037     allow_partial: don't abort the find if a child of the
1038                    device can't be found; this is intended to be
1039                    used when repairing mirrors
1040
1041   Returns:
1042     None if the device can't be found
1043     otherwise the device instance
1044
1045   """
1046   children = []
1047   if disk.children:
1048     for chdisk in disk.children:
1049       children.append(_RecursiveFindBD(chdisk))
1050
1051   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1052
1053
1054 def FindBlockDevice(disk):
1055   """Check if a device is activated.
1056
1057   If so, return informations about the real device.
1058
1059   Args:
1060     disk: the objects.Disk instance
1061   Returns:
1062     None if the device can't be found
1063     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1064
1065   """
1066   rbd = _RecursiveFindBD(disk)
1067   if rbd is None:
1068     return rbd
1069   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1070
1071
1072 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1073   """Write a file to the filesystem.
1074
1075   This allows the master to overwrite(!) a file. It will only perform
1076   the operation if the file belongs to a list of configuration files.
1077
1078   """
1079   if not os.path.isabs(file_name):
1080     logger.Error("Filename passed to UploadFile is not absolute: '%s'" %
1081                  file_name)
1082     return False
1083
1084   allowed_files = [
1085     constants.CLUSTER_CONF_FILE,
1086     constants.ETC_HOSTS,
1087     constants.SSH_KNOWN_HOSTS_FILE,
1088     ]
1089   allowed_files.extend(ssconf.SimpleStore().GetFileList())
1090   if file_name not in allowed_files:
1091     logger.Error("Filename passed to UploadFile not in allowed"
1092                  " upload targets: '%s'" % file_name)
1093     return False
1094
1095   dir_name, small_name = os.path.split(file_name)
1096   fd, new_name = tempfile.mkstemp('.new', small_name, dir_name)
1097   # here we need to make sure we remove the temp file, if any error
1098   # leaves it in place
1099   try:
1100     os.chown(new_name, uid, gid)
1101     os.chmod(new_name, mode)
1102     os.write(fd, data)
1103     os.fsync(fd)
1104     os.utime(new_name, (atime, mtime))
1105     os.rename(new_name, file_name)
1106   finally:
1107     os.close(fd)
1108     utils.RemoveFile(new_name)
1109   return True
1110
1111
1112 def _ErrnoOrStr(err):
1113   """Format an EnvironmentError exception.
1114
1115   If the `err` argument has an errno attribute, it will be looked up
1116   and converted into a textual EXXXX description. Otherwise the string
1117   representation of the error will be returned.
1118
1119   """
1120   if hasattr(err, 'errno'):
1121     detail = errno.errorcode[err.errno]
1122   else:
1123     detail = str(err)
1124   return detail
1125
1126
1127 def _OSOndiskVersion(name, os_dir):
1128   """Compute and return the API version of a given OS.
1129
1130   This function will try to read the API version of the os given by
1131   the 'name' parameter and residing in the 'os_dir' directory.
1132
1133   Return value will be either an integer denoting the version or None in the
1134   case when this is not a valid OS name.
1135
1136   """
1137   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1138
1139   try:
1140     st = os.stat(api_file)
1141   except EnvironmentError, err:
1142     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1143                            " found (%s)" % _ErrnoOrStr(err))
1144
1145   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1146     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1147                            " a regular file")
1148
1149   try:
1150     f = open(api_file)
1151     try:
1152       api_versions = f.readlines()
1153     finally:
1154       f.close()
1155   except EnvironmentError, err:
1156     raise errors.InvalidOS(name, os_dir, "error while reading the"
1157                            " API version (%s)" % _ErrnoOrStr(err))
1158
1159   api_versions = [version.strip() for version in api_versions]
1160   try:
1161     api_versions = [int(version) for version in api_versions]
1162   except (TypeError, ValueError), err:
1163     raise errors.InvalidOS(name, os_dir,
1164                            "API version is not integer (%s)" % str(err))
1165
1166   return api_versions
1167
1168
1169 def DiagnoseOS(top_dirs=None):
1170   """Compute the validity for all OSes.
1171
1172   Returns an OS object for each name in all the given top directories
1173   (if not given defaults to constants.OS_SEARCH_PATH)
1174
1175   Returns:
1176     list of OS objects
1177
1178   """
1179   if top_dirs is None:
1180     top_dirs = constants.OS_SEARCH_PATH
1181
1182   result = []
1183   for dir_name in top_dirs:
1184     if os.path.isdir(dir_name):
1185       try:
1186         f_names = utils.ListVisibleFiles(dir_name)
1187       except EnvironmentError, err:
1188         logger.Error("Can't list the OS directory %s: %s" %
1189                      (dir_name, str(err)))
1190         break
1191       for name in f_names:
1192         try:
1193           os_inst = OSFromDisk(name, base_dir=dir_name)
1194           result.append(os_inst)
1195         except errors.InvalidOS, err:
1196           result.append(objects.OS.FromInvalidOS(err))
1197
1198   return result
1199
1200
1201 def OSFromDisk(name, base_dir=None):
1202   """Create an OS instance from disk.
1203
1204   This function will return an OS instance if the given name is a
1205   valid OS name. Otherwise, it will raise an appropriate
1206   `errors.InvalidOS` exception, detailing why this is not a valid
1207   OS.
1208
1209   Args:
1210     os_dir: Directory containing the OS scripts. Defaults to a search
1211             in all the OS_SEARCH_PATH directories.
1212
1213   """
1214
1215   if base_dir is None:
1216     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1217     if os_dir is None:
1218       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1219   else:
1220     os_dir = os.path.sep.join([base_dir, name])
1221
1222   api_versions = _OSOndiskVersion(name, os_dir)
1223
1224   if constants.OS_API_VERSION not in api_versions:
1225     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1226                            " (found %s want %s)"
1227                            % (api_versions, constants.OS_API_VERSION))
1228
1229   # OS Scripts dictionary, we will populate it with the actual script names
1230   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1231
1232   for script in os_scripts:
1233     os_scripts[script] = os.path.sep.join([os_dir, script])
1234
1235     try:
1236       st = os.stat(os_scripts[script])
1237     except EnvironmentError, err:
1238       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1239                              (script, _ErrnoOrStr(err)))
1240
1241     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1242       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1243                              script)
1244
1245     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1246       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1247                              script)
1248
1249
1250   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1251                     create_script=os_scripts['create'],
1252                     export_script=os_scripts['export'],
1253                     import_script=os_scripts['import'],
1254                     rename_script=os_scripts['rename'],
1255                     api_versions=api_versions)
1256
1257
1258 def GrowBlockDevice(disk, amount):
1259   """Grow a stack of block devices.
1260
1261   This function is called recursively, with the childrens being the
1262   first one resize.
1263
1264   Args:
1265     disk: the disk to be grown
1266
1267   Returns: a tuple of (status, result), with:
1268     status: the result (true/false) of the operation
1269     result: the error message if the operation failed, otherwise not used
1270
1271   """
1272   r_dev = _RecursiveFindBD(disk)
1273   if r_dev is None:
1274     return False, "Cannot find block device %s" % (disk,)
1275
1276   try:
1277     r_dev.Grow(amount)
1278   except errors.BlockDeviceError, err:
1279     return False, str(err)
1280
1281   return True, None
1282
1283
1284 def SnapshotBlockDevice(disk):
1285   """Create a snapshot copy of a block device.
1286
1287   This function is called recursively, and the snapshot is actually created
1288   just for the leaf lvm backend device.
1289
1290   Args:
1291     disk: the disk to be snapshotted
1292
1293   Returns:
1294     a config entry for the actual lvm device snapshotted.
1295
1296   """
1297   if disk.children:
1298     if len(disk.children) == 1:
1299       # only one child, let's recurse on it
1300       return SnapshotBlockDevice(disk.children[0])
1301     else:
1302       # more than one child, choose one that matches
1303       for child in disk.children:
1304         if child.size == disk.size:
1305           # return implies breaking the loop
1306           return SnapshotBlockDevice(child)
1307   elif disk.dev_type == constants.LD_LV:
1308     r_dev = _RecursiveFindBD(disk)
1309     if r_dev is not None:
1310       # let's stay on the safe side and ask for the full size, for now
1311       return r_dev.Snapshot(disk.size)
1312     else:
1313       return None
1314   else:
1315     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1316                                  " '%s' of type '%s'" %
1317                                  (disk.unique_id, disk.dev_type))
1318
1319
1320 def ExportSnapshot(disk, dest_node, instance):
1321   """Export a block device snapshot to a remote node.
1322
1323   Args:
1324     disk: the snapshot block device
1325     dest_node: the node to send the image to
1326     instance: instance being exported
1327
1328   Returns:
1329     True if successful, False otherwise.
1330
1331   """
1332   inst_os = OSFromDisk(instance.os)
1333   export_script = inst_os.export_script
1334
1335   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1336                                      instance.name, int(time.time()))
1337   if not os.path.exists(constants.LOG_OS_DIR):
1338     os.mkdir(constants.LOG_OS_DIR, 0750)
1339
1340   real_os_dev = _RecursiveFindBD(disk)
1341   if real_os_dev is None:
1342     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1343                                   str(disk))
1344   real_os_dev.Open()
1345
1346   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1347   destfile = disk.physical_id[1]
1348
1349   # the target command is built out of three individual commands,
1350   # which are joined by pipes; we check each individual command for
1351   # valid parameters
1352
1353   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1354                                export_script, instance.name,
1355                                real_os_dev.dev_path, logfile)
1356
1357   comprcmd = "gzip"
1358
1359   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1360                                 destdir, destdir, destfile)
1361   remotecmd = ssh.BuildSSHCmd(dest_node, constants.GANETI_RUNAS, destcmd)
1362
1363
1364
1365   # all commands have been checked, so we're safe to combine them
1366   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1367
1368   result = utils.RunCmd(command)
1369
1370   if result.failed:
1371     logger.Error("os snapshot export command '%s' returned error: %s"
1372                  " output: %s" %
1373                  (command, result.fail_reason, result.output))
1374     return False
1375
1376   return True
1377
1378
1379 def FinalizeExport(instance, snap_disks):
1380   """Write out the export configuration information.
1381
1382   Args:
1383     instance: instance configuration
1384     snap_disks: snapshot block devices
1385
1386   Returns:
1387     False in case of error, True otherwise.
1388
1389   """
1390   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1391   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1392
1393   config = objects.SerializableConfigParser()
1394
1395   config.add_section(constants.INISECT_EXP)
1396   config.set(constants.INISECT_EXP, 'version', '0')
1397   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1398   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1399   config.set(constants.INISECT_EXP, 'os', instance.os)
1400   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1401
1402   config.add_section(constants.INISECT_INS)
1403   config.set(constants.INISECT_INS, 'name', instance.name)
1404   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1405   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1406   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1407   for nic_count, nic in enumerate(instance.nics):
1408     config.set(constants.INISECT_INS, 'nic%d_mac' %
1409                nic_count, '%s' % nic.mac)
1410     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1411     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1412                '%s' % nic.bridge)
1413   # TODO: redundant: on load can read nics until it doesn't exist
1414   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1415
1416   for disk_count, disk in enumerate(snap_disks):
1417     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1418                ('%s' % disk.iv_name))
1419     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1420                ('%s' % disk.physical_id[1]))
1421     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1422                ('%d' % disk.size))
1423   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1424
1425   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1426   cfo = open(cff, 'w')
1427   try:
1428     config.write(cfo)
1429   finally:
1430     cfo.close()
1431
1432   shutil.rmtree(finaldestdir, True)
1433   shutil.move(destdir, finaldestdir)
1434
1435   return True
1436
1437
1438 def ExportInfo(dest):
1439   """Get export configuration information.
1440
1441   Args:
1442     dest: directory containing the export
1443
1444   Returns:
1445     A serializable config file containing the export info.
1446
1447   """
1448   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1449
1450   config = objects.SerializableConfigParser()
1451   config.read(cff)
1452
1453   if (not config.has_section(constants.INISECT_EXP) or
1454       not config.has_section(constants.INISECT_INS)):
1455     return None
1456
1457   return config
1458
1459
1460 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1461   """Import an os image into an instance.
1462
1463   Args:
1464     instance: the instance object
1465     os_disk: the instance-visible name of the os device
1466     swap_disk: the instance-visible name of the swap device
1467     src_node: node holding the source image
1468     src_image: path to the source image on src_node
1469
1470   Returns:
1471     False in case of error, True otherwise.
1472
1473   """
1474   inst_os = OSFromDisk(instance.os)
1475   import_script = inst_os.import_script
1476
1477   os_device = instance.FindDisk(os_disk)
1478   if os_device is None:
1479     logger.Error("Can't find this device-visible name '%s'" % os_disk)
1480     return False
1481
1482   swap_device = instance.FindDisk(swap_disk)
1483   if swap_device is None:
1484     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
1485     return False
1486
1487   real_os_dev = _RecursiveFindBD(os_device)
1488   if real_os_dev is None:
1489     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1490                                   str(os_device))
1491   real_os_dev.Open()
1492
1493   real_swap_dev = _RecursiveFindBD(swap_device)
1494   if real_swap_dev is None:
1495     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1496                                   str(swap_device))
1497   real_swap_dev.Open()
1498
1499   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1500                                         instance.name, int(time.time()))
1501   if not os.path.exists(constants.LOG_OS_DIR):
1502     os.mkdir(constants.LOG_OS_DIR, 0750)
1503
1504   destcmd = utils.BuildShellCmd('cat %s', src_image)
1505   remotecmd = ssh.BuildSSHCmd(src_node, constants.GANETI_RUNAS, destcmd)
1506
1507   comprcmd = "gunzip"
1508   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1509                                inst_os.path, import_script, instance.name,
1510                                real_os_dev.dev_path, real_swap_dev.dev_path,
1511                                logfile)
1512
1513   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1514
1515   result = utils.RunCmd(command)
1516
1517   if result.failed:
1518     logger.Error("os import command '%s' returned error: %s"
1519                  " output: %s" %
1520                  (command, result.fail_reason, result.output))
1521     return False
1522
1523   return True
1524
1525
1526 def ListExports():
1527   """Return a list of exports currently available on this machine.
1528
1529   """
1530   if os.path.isdir(constants.EXPORT_DIR):
1531     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1532   else:
1533     return []
1534
1535
1536 def RemoveExport(export):
1537   """Remove an existing export from the node.
1538
1539   Args:
1540     export: the name of the export to remove
1541
1542   Returns:
1543     False in case of error, True otherwise.
1544
1545   """
1546   target = os.path.join(constants.EXPORT_DIR, export)
1547
1548   shutil.rmtree(target)
1549   # TODO: catch some of the relevant exceptions and provide a pretty
1550   # error message if rmtree fails.
1551
1552   return True
1553
1554
1555 def RenameBlockDevices(devlist):
1556   """Rename a list of block devices.
1557
1558   The devlist argument is a list of tuples (disk, new_logical,
1559   new_physical). The return value will be a combined boolean result
1560   (True only if all renames succeeded).
1561
1562   """
1563   result = True
1564   for disk, unique_id in devlist:
1565     dev = _RecursiveFindBD(disk)
1566     if dev is None:
1567       result = False
1568       continue
1569     try:
1570       old_rpath = dev.dev_path
1571       dev.Rename(unique_id)
1572       new_rpath = dev.dev_path
1573       if old_rpath != new_rpath:
1574         DevCacheManager.RemoveCache(old_rpath)
1575         # FIXME: we should add the new cache information here, like:
1576         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1577         # but we don't have the owner here - maybe parse from existing
1578         # cache? for now, we only lose lvm data when we rename, which
1579         # is less critical than DRBD or MD
1580     except errors.BlockDeviceError, err:
1581       logger.Error("Can't rename device '%s' to '%s': %s" %
1582                    (dev, unique_id, err))
1583       result = False
1584   return result
1585
1586
1587 def CloseBlockDevices(instance_name, disks):
1588   """Closes the given block devices.
1589
1590   This means they will be switched to secondary mode (in case of DRBD).
1591
1592   Args:
1593     - instance_name: if the argument is not false, then the symlinks
1594                      belonging to the instance are removed, in order
1595                      to keep only the 'active' devices symlinked
1596     - disks: a list of objects.Disk instances
1597
1598   """
1599   bdevs = []
1600   for cf in disks:
1601     rd = _RecursiveFindBD(cf)
1602     if rd is None:
1603       return (False, "Can't find device %s" % cf)
1604     bdevs.append(rd)
1605
1606   msg = []
1607   for rd in bdevs:
1608     try:
1609       rd.Close()
1610     except errors.BlockDeviceError, err:
1611       msg.append(str(err))
1612   if instance_name:
1613     _RemoveBlockDevLinks(instance_name, disks)
1614   if msg:
1615     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1616   else:
1617     return (True, "All devices secondary")
1618
1619
1620 def DrbdReconfigNet(instance_name, disks, nodes_ip, multimaster, step):
1621   """Tune the network settings on a list of drbd devices.
1622
1623   The 'step' argument has three possible values:
1624     - step 1, init the operation and identify the disks
1625     - step 2, disconnect the network
1626     - step 3, reconnect with correct settings and (if needed) set primary
1627     - step 4, change disks into read-only (secondary) mode
1628     - step 5, wait until devices are synchronized
1629
1630   """
1631   # local short name for the cache
1632   cache = _D8_RECONF_CACHE
1633
1634   # set the correct physical ID so that we can use it in the cache
1635   my_name = utils.HostInfo().name
1636   for cf in disks:
1637     cf.SetPhysicalID(my_name, nodes_ip)
1638
1639   bdevs = []
1640
1641   # note: the cache keys do not contain the multimaster setting, as we
1642   # want to reuse the cache between the to-master, (in the meantime
1643   # failover), to-secondary calls
1644
1645   if step == constants.DRBD_RECONF_RPC_INIT:
1646     # we clear the cache
1647     cache.clear()
1648     for cf in disks:
1649       key = (instance_name, cf.physical_id)
1650       rd = _RecursiveFindBD(cf)
1651       if rd is None:
1652         return (False, "Can't find device %s" % cf)
1653       bdevs.append(rd)
1654       cache[key] = rd
1655   else:
1656     # we check that the cached items are exactly what we have been passed
1657     for cf in disks:
1658       key = (instance_name, cf.physical_id)
1659       if key not in cache:
1660         return (False, "ReconfigCache has wrong contents - missing %s" % key)
1661       bdevs.append(cache[key])
1662     if len(cache) != len(disks):
1663       return (False, "ReconfigCache: wrong contents, extra items are present")
1664
1665   if step == constants.DRBD_RECONF_RPC_INIT:
1666     # nothing to do beside the discovery/caching of the disks
1667     return (True, "Disks have been identified")
1668   elif step == constants.DRBD_RECONF_RPC_DISCONNECT:
1669     # disconnect disks
1670     for rd in bdevs:
1671       try:
1672         rd.DisconnectNet()
1673       except errors.BlockDeviceError, err:
1674         logger.Debug("Failed to go into standalone mode: %s" % str(err))
1675         return (False, "Can't change network configuration: %s" % str(err))
1676     return (True, "All disks are now disconnected")
1677   elif step == constants.DRBD_RECONF_RPC_RECONNECT:
1678     if multimaster:
1679       for cf, rd in zip(disks, bdevs):
1680         try:
1681           _SymlinkBlockDev(instance_name, rd.dev_path, cf.iv_name)
1682         except EnvironmentError, err:
1683           return (False, "Can't create symlink: %s" % str(err))
1684     # reconnect disks, switch to new master configuration and if
1685     # needed primary mode
1686     for rd in bdevs:
1687       try:
1688         rd.ReAttachNet(multimaster)
1689       except errors.BlockDeviceError, err:
1690         return (False, "Can't change network configuration: %s" % str(err))
1691     # wait until the disks are connected; we need to retry the re-attach
1692     # if the device becomes standalone, as this might happen if the one
1693     # node disconnects and reconnects in a different mode before the
1694     # other node reconnects; in this case, one or both of the nodes will
1695     # decide it has wrong configuration and switch to standalone
1696     RECONNECT_TIMEOUT = 2 * 60
1697     sleep_time = 0.100 # start with 100 miliseconds
1698     timeout_limit = time.time() + RECONNECT_TIMEOUT
1699     while time.time() < timeout_limit:
1700       all_connected = True
1701       for rd in bdevs:
1702         stats = rd.GetProcStatus()
1703         if not (stats.is_connected or stats.is_in_resync):
1704           all_connected = False
1705         if stats.is_standalone:
1706           # peer had different config info and this node became
1707           # standalone, even though this should not happen with the
1708           # new staged way of changing disk configs
1709           try:
1710             rd.ReAttachNet(multimaster)
1711           except errors.BlockDeviceError, err:
1712             return (False, "Can't change network configuration: %s" % str(err))
1713       if all_connected:
1714         break
1715       time.sleep(sleep_time)
1716       sleep_time = min(5, sleep_time * 1.5)
1717     if not all_connected:
1718       return (False, "Timeout in disk reconnecting")
1719     if multimaster:
1720       # change to primary mode
1721       for rd in bdevs:
1722         rd.Open()
1723     if multimaster:
1724       msg = "multi-master and primary"
1725     else:
1726       msg = "single-master"
1727     return (True, "Disks are now configured as %s" % msg)
1728   elif step == constants.DRBD_RECONF_RPC_SECONDARY:
1729     msg = []
1730     for rd in bdevs:
1731       try:
1732         rd.Close()
1733       except errors.BlockDeviceError, err:
1734         msg.append(str(err))
1735     _RemoveBlockDevLinks(instance_name, disks)
1736     if msg:
1737       return (False, "Can't make devices secondary: %s" % ",".join(msg))
1738     else:
1739       return (True, "All devices secondary")
1740   elif step == constants.DRBD_RECONF_RPC_WFSYNC:
1741     min_resync = 100
1742     alldone = True
1743     failure = False
1744     for rd in bdevs:
1745       stats = rd.GetProcStatus()
1746       if not (stats.is_connected or stats.is_in_resync):
1747         failure = True
1748         break
1749       alldone = alldone and (not stats.is_in_resync)
1750       if stats.sync_percent is not None:
1751         min_resync = min(min_resync, stats.sync_percent)
1752     return (not failure, (alldone, min_resync))
1753   else:
1754     return (False, "Unknown reconfiguration step %s" % step)
1755
1756
1757 class HooksRunner(object):
1758   """Hook runner.
1759
1760   This class is instantiated on the node side (ganeti-noded) and not on
1761   the master side.
1762
1763   """
1764   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1765
1766   def __init__(self, hooks_base_dir=None):
1767     """Constructor for hooks runner.
1768
1769     Args:
1770       - hooks_base_dir: if not None, this overrides the
1771         constants.HOOKS_BASE_DIR (useful for unittests)
1772
1773     """
1774     if hooks_base_dir is None:
1775       hooks_base_dir = constants.HOOKS_BASE_DIR
1776     self._BASE_DIR = hooks_base_dir
1777
1778   @staticmethod
1779   def ExecHook(script, env):
1780     """Exec one hook script.
1781
1782     Args:
1783      - script: the full path to the script
1784      - env: the environment with which to exec the script
1785
1786     """
1787     # exec the process using subprocess and log the output
1788     fdstdin = None
1789     try:
1790       fdstdin = open("/dev/null", "r")
1791       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1792                                stderr=subprocess.STDOUT, close_fds=True,
1793                                shell=False, cwd="/", env=env)
1794       output = ""
1795       try:
1796         output = child.stdout.read(4096)
1797         child.stdout.close()
1798       except EnvironmentError, err:
1799         output += "Hook script error: %s" % str(err)
1800
1801       while True:
1802         try:
1803           result = child.wait()
1804           break
1805         except EnvironmentError, err:
1806           if err.errno == errno.EINTR:
1807             continue
1808           raise
1809     finally:
1810       # try not to leak fds
1811       for fd in (fdstdin, ):
1812         if fd is not None:
1813           try:
1814             fd.close()
1815           except EnvironmentError, err:
1816             # just log the error
1817             #logger.Error("While closing fd %s: %s" % (fd, err))
1818             pass
1819
1820     return result == 0, output
1821
1822   def RunHooks(self, hpath, phase, env):
1823     """Run the scripts in the hooks directory.
1824
1825     This method will not be usually overriden by child opcodes.
1826
1827     """
1828     if phase == constants.HOOKS_PHASE_PRE:
1829       suffix = "pre"
1830     elif phase == constants.HOOKS_PHASE_POST:
1831       suffix = "post"
1832     else:
1833       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1834     rr = []
1835
1836     subdir = "%s-%s.d" % (hpath, suffix)
1837     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1838     try:
1839       dir_contents = utils.ListVisibleFiles(dir_name)
1840     except OSError, err:
1841       # must log
1842       return rr
1843
1844     # we use the standard python sort order,
1845     # so 00name is the recommended naming scheme
1846     dir_contents.sort()
1847     for relname in dir_contents:
1848       fname = os.path.join(dir_name, relname)
1849       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1850           self.RE_MASK.match(relname) is not None):
1851         rrval = constants.HKR_SKIP
1852         output = ""
1853       else:
1854         result, output = self.ExecHook(fname, env)
1855         if not result:
1856           rrval = constants.HKR_FAIL
1857         else:
1858           rrval = constants.HKR_SUCCESS
1859       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1860
1861     return rr
1862
1863
1864 class IAllocatorRunner(object):
1865   """IAllocator runner.
1866
1867   This class is instantiated on the node side (ganeti-noded) and not on
1868   the master side.
1869
1870   """
1871   def Run(self, name, idata):
1872     """Run an iallocator script.
1873
1874     Return value: tuple of:
1875        - run status (one of the IARUN_ constants)
1876        - stdout
1877        - stderr
1878        - fail reason (as from utils.RunResult)
1879
1880     """
1881     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1882                                   os.path.isfile)
1883     if alloc_script is None:
1884       return (constants.IARUN_NOTFOUND, None, None, None)
1885
1886     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1887     try:
1888       os.write(fd, idata)
1889       os.close(fd)
1890       result = utils.RunCmd([alloc_script, fin_name])
1891       if result.failed:
1892         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1893                 result.fail_reason)
1894     finally:
1895       os.unlink(fin_name)
1896
1897     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1898
1899
1900 class DevCacheManager(object):
1901   """Simple class for managing a cache of block device information.
1902
1903   """
1904   _DEV_PREFIX = "/dev/"
1905   _ROOT_DIR = constants.BDEV_CACHE_DIR
1906
1907   @classmethod
1908   def _ConvertPath(cls, dev_path):
1909     """Converts a /dev/name path to the cache file name.
1910
1911     This replaces slashes with underscores and strips the /dev
1912     prefix. It then returns the full path to the cache file
1913
1914     """
1915     if dev_path.startswith(cls._DEV_PREFIX):
1916       dev_path = dev_path[len(cls._DEV_PREFIX):]
1917     dev_path = dev_path.replace("/", "_")
1918     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1919     return fpath
1920
1921   @classmethod
1922   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1923     """Updates the cache information for a given device.
1924
1925     """
1926     if dev_path is None:
1927       logger.Error("DevCacheManager.UpdateCache got a None dev_path")
1928       return
1929     fpath = cls._ConvertPath(dev_path)
1930     if on_primary:
1931       state = "primary"
1932     else:
1933       state = "secondary"
1934     if iv_name is None:
1935       iv_name = "not_visible"
1936     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1937     try:
1938       utils.WriteFile(fpath, data=fdata)
1939     except EnvironmentError, err:
1940       logger.Error("Can't update bdev cache for %s, error %s" %
1941                    (dev_path, str(err)))
1942
1943   @classmethod
1944   def RemoveCache(cls, dev_path):
1945     """Remove data for a dev_path.
1946
1947     """
1948     if dev_path is None:
1949       logger.Error("DevCacheManager.RemoveCache got a None dev_path")
1950       return
1951     fpath = cls._ConvertPath(dev_path)
1952     try:
1953       utils.RemoveFile(fpath)
1954     except EnvironmentError, err:
1955       logger.Error("Can't update bdev cache for %s, error %s" %
1956                    (dev_path, str(err)))