Make import/export use the auxiliary ssh library to build the remote commands.
[ganeti-local] / lib / backend.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import tempfile
30 import stat
31 import errno
32 import re
33 import subprocess
34
35 from ganeti import logger
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import ssh
39 from ganeti import hypervisor
40 from ganeti import constants
41 from ganeti import bdev
42 from ganeti import objects
43 from ganeti import ssconf
44
45
46 def StartMaster():
47   """Activate local node as master node.
48
49   There are two needed steps for this:
50     - run the master script
51     - register the cron script
52
53   """
54   result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
55
56   if result.failed:
57     logger.Error("could not activate cluster interface with command %s,"
58                  " error: '%s'" % (result.cmd, result.output))
59     return False
60
61   return True
62
63
64 def StopMaster():
65   """Deactivate this node as master.
66
67   This does two things:
68     - run the master stop script
69     - remove link to master cron script.
70
71   """
72   result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
73
74   if result.failed:
75     logger.Error("could not deactivate cluster interface with command %s,"
76                  " error: '%s'" % (result.cmd, result.output))
77     return False
78
79   return True
80
81
82 def AddNode(dsa, dsapub, rsa, rsapub, ssh, sshpub):
83   """ adds the node to the cluster
84       - updates the hostkey
85       - adds the ssh-key
86       - sets the node id
87       - sets the node status to installed
88
89   """
90   f = open("/etc/ssh/ssh_host_rsa_key", 'w')
91   f.write(rsa)
92   f.close()
93
94   f = open("/etc/ssh/ssh_host_rsa_key.pub", 'w')
95   f.write(rsapub)
96   f.close()
97
98   f = open("/etc/ssh/ssh_host_dsa_key", 'w')
99   f.write(dsa)
100   f.close()
101
102   f = open("/etc/ssh/ssh_host_dsa_key.pub", 'w')
103   f.write(dsapub)
104   f.close()
105
106   if not os.path.isdir("/root/.ssh"):
107     os.mkdir("/root/.ssh")
108
109   f = open("/root/.ssh/id_dsa", 'w')
110   f.write(ssh)
111   f.close()
112
113   f = open("/root/.ssh/id_dsa.pub", 'w')
114   f.write(sshpub)
115   f.close()
116
117   f = open('/root/.ssh/id_dsa.pub', 'r')
118   try:
119     utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
120   finally:
121     f.close()
122
123   utils.RunCmd(["/etc/init.d/ssh", "restart"])
124
125   return True
126
127
128 def LeaveCluster():
129   """Cleans up the current node and prepares it to be removed from the cluster.
130
131   """
132   if os.path.exists(constants.DATA_DIR):
133     for dirpath, dirnames, filenames in os.walk(constants.DATA_DIR):
134       if dirpath == constants.DATA_DIR:
135         for i in filenames:
136           os.unlink(os.path.join(dirpath, i))
137
138   f = open('/root/.ssh/id_dsa.pub', 'r')
139   try:
140     utils.RemoveAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
141   finally:
142     f.close()
143
144   utils.RemoveFile('/root/.ssh/id_dsa')
145   utils.RemoveFile('/root/.ssh/id_dsa.pub')
146
147
148 def GetNodeInfo(vgname):
149   """ gives back a hash with different informations
150   about the node
151
152   Returns:
153     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
154       'memory_free' : xxx, 'memory_total' : xxx }
155     where
156     vg_size is the size of the configured volume group in MiB
157     vg_free is the free size of the volume group in MiB
158     memory_dom0 is the memory allocated for domain0 in MiB
159     memory_free is the currently available (free) ram in MiB
160     memory_total is the total number of ram in MiB
161
162   """
163   outputarray = {}
164   vginfo = _GetVGInfo(vgname)
165   outputarray['vg_size'] = vginfo['vg_size']
166   outputarray['vg_free'] = vginfo['vg_free']
167
168   hyper = hypervisor.GetHypervisor()
169   hyp_info = hyper.GetNodeInfo()
170   if hyp_info is not None:
171     outputarray.update(hyp_info)
172
173   return outputarray
174
175
176 def VerifyNode(what):
177   """Verify the status of the local node.
178
179   Args:
180     what - a dictionary of things to check:
181       'filelist' : list of files for which to compute checksums
182       'nodelist' : list of nodes we should check communication with
183       'hypervisor': run the hypervisor-specific verify
184
185   Requested files on local node are checksummed and the result returned.
186
187   The nodelist is traversed, with the following checks being made
188   for each node:
189   - known_hosts key correct
190   - correct resolving of node name (target node returns its own hostname
191     by ssh-execution of 'hostname', result compared against name in list.
192
193   """
194   result = {}
195
196   if 'hypervisor' in what:
197     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
198
199   if 'filelist' in what:
200     result['filelist'] = utils.FingerprintFiles(what['filelist'])
201
202   if 'nodelist' in what:
203     result['nodelist'] = {}
204     for node in what['nodelist']:
205       success, message = ssh.VerifyNodeHostname(node)
206       if not success:
207         result['nodelist'][node] = message
208   return result
209
210
211 def GetVolumeList(vg_name):
212   """Compute list of logical volumes and their size.
213
214   Returns:
215     dictionary of all partions (key) with their size:
216     test1: 20.06MiB
217
218   """
219   result = utils.RunCmd(["lvs", "--noheadings", "--units=m",
220                          "-oname,size", vg_name])
221   if result.failed:
222     logger.Error("Failed to list logical volumes, lvs output: %s" %
223                  result.output)
224     return {}
225
226   lvlist = [line.split() for line in result.output.splitlines()]
227   return dict(lvlist)
228
229
230 def ListVolumeGroups():
231   """List the volume groups and their size
232
233   Returns:
234     Dictionary with keys volume name and values the size of the volume
235
236   """
237   return utils.ListVolumeGroups()
238
239
240 def NodeVolumes():
241   """List all volumes on this node.
242
243   """
244   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
245                          "--separator=|",
246                          "--options=lv_name,lv_size,devices,vg_name"])
247   if result.failed:
248     logger.Error("Failed to list logical volumes, lvs output: %s" %
249                  result.output)
250     return {}
251
252   def parse_dev(dev):
253     if '(' in dev:
254       return dev.split('(')[0]
255     else:
256       return dev
257
258   def map_line(line):
259     return {
260       'name': line[0].strip(),
261       'size': line[1].strip(),
262       'dev': parse_dev(line[2].strip()),
263       'vg': line[3].strip(),
264     }
265
266   return [map_line(line.split('|')) for line in result.output.splitlines()]
267
268
269 def BridgesExist(bridges_list):
270   """Check if a list of bridges exist on the current node
271
272   Returns:
273     True if all of them exist, false otherwise
274
275   """
276   for bridge in bridges_list:
277     if not utils.BridgeExists(bridge):
278       return False
279
280   return True
281
282
283 def GetInstanceList():
284   """ provides a list of instances
285
286   Returns:
287     A list of all running instances on the current node
288     - instance1.example.com
289     - instance2.example.com
290
291   """
292   try:
293     names = hypervisor.GetHypervisor().ListInstances()
294   except errors.HypervisorError, err:
295     logger.Error("error enumerating instances: %s" % str(err))
296     raise
297
298   return names
299
300
301 def GetInstanceInfo(instance):
302   """ gives back the informations about an instance
303   as a dictonary
304
305   Args:
306     instance: name of the instance (ex. instance1.example.com)
307
308   Returns:
309     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
310     where
311     memory: memory size of instance (int)
312     state: xen state of instance (string)
313     time: cpu time of instance (float)
314
315   """
316   output = {}
317
318   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
319   if iinfo is not None:
320     output['memory'] = iinfo[2]
321     output['state'] = iinfo[4]
322     output['time'] = iinfo[5]
323
324   return output
325
326
327 def GetAllInstancesInfo():
328   """Gather data about all instances.
329
330   This is the equivalent of `GetInstanceInfo()`, except that it
331   computes data for all instances at once, thus being faster if one
332   needs data about more than one instance.
333
334   Returns: a dictionary of dictionaries, keys being the instance name,
335     and with values:
336     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
337     where
338     memory: memory size of instance (int)
339     state: xen state of instance (string)
340     time: cpu time of instance (float)
341     vcpus: the number of cpus
342
343   """
344   output = {}
345
346   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
347   if iinfo:
348     for name, inst_id, memory, vcpus, state, times in iinfo:
349       output[name] = {
350         'memory': memory,
351         'vcpus': vcpus,
352         'state': state,
353         'time': times,
354         }
355
356   return output
357
358
359 def AddOSToInstance(instance, os_disk, swap_disk):
360   """Add an os to an instance.
361
362   Args:
363     instance: the instance object
364     os_disk: the instance-visible name of the os device
365     swap_disk: the instance-visible name of the swap device
366
367   """
368   inst_os = OSFromDisk(instance.os)
369
370   create_script = inst_os.create_script
371
372   for os_device in instance.disks:
373     if os_device.iv_name == os_disk:
374       break
375   else:
376     logger.Error("Can't find this device-visible name '%s'" % os_disk)
377     return False
378
379   for swap_device in instance.disks:
380     if swap_device.iv_name == swap_disk:
381       break
382   else:
383     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
384     return False
385
386   real_os_dev = _RecursiveFindBD(os_device)
387   if real_os_dev is None:
388     raise errors.BlockDeviceError("Block device '%s' is not set up" %
389                                   str(os_device))
390   real_os_dev.Open()
391
392   real_swap_dev = _RecursiveFindBD(swap_device)
393   if real_swap_dev is None:
394     raise errors.BlockDeviceError("Block device '%s' is not set up" %
395                                   str(swap_device))
396   real_swap_dev.Open()
397
398   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
399                                      instance.name, int(time.time()))
400   if not os.path.exists(constants.LOG_OS_DIR):
401     os.mkdir(constants.LOG_OS_DIR, 0750)
402
403   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
404                                 inst_os.path, create_script, instance.name,
405                                 real_os_dev.dev_path, real_swap_dev.dev_path,
406                                 logfile)
407
408   result = utils.RunCmd(command)
409
410   if result.failed:
411     logger.Error("os create command '%s' returned error: %s"
412                  " output: %s" %
413                  (command, result.fail_reason, result.output))
414     return False
415
416   return True
417
418
419 def _GetVGInfo(vg_name):
420   """Get informations about the volume group.
421
422   Args:
423     vg_name: the volume group
424
425   Returns:
426     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
427     where
428     vg_size is the total size of the volume group in MiB
429     vg_free is the free size of the volume group in MiB
430     pv_count are the number of physical disks in that vg
431
432   """
433   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
434                          "--nosuffix", "--units=m", "--separator=:", vg_name])
435
436   if retval.failed:
437     errmsg = "volume group %s not present" % vg_name
438     logger.Error(errmsg)
439     raise errors.LVMError(errmsg)
440   valarr = retval.stdout.strip().split(':')
441   retdic = {
442     "vg_size": int(round(float(valarr[0]), 0)),
443     "vg_free": int(round(float(valarr[1]), 0)),
444     "pv_count": int(valarr[2]),
445     }
446   return retdic
447
448
449 def _GatherBlockDevs(instance):
450   """Set up an instance's block device(s).
451
452   This is run on the primary node at instance startup. The block
453   devices must be already assembled.
454
455   """
456   block_devices = []
457   for disk in instance.disks:
458     device = _RecursiveFindBD(disk)
459     if device is None:
460       raise errors.BlockDeviceError("Block device '%s' is not set up." %
461                                     str(disk))
462     device.Open()
463     block_devices.append((disk, device))
464   return block_devices
465
466
467 def StartInstance(instance, extra_args):
468   """Start an instance.
469
470   Args:
471     instance - name of instance to start.
472
473   """
474   running_instances = GetInstanceList()
475
476   if instance.name in running_instances:
477     return True
478
479   block_devices = _GatherBlockDevs(instance)
480   hyper = hypervisor.GetHypervisor()
481
482   try:
483     hyper.StartInstance(instance, block_devices, extra_args)
484   except errors.HypervisorError, err:
485     logger.Error("Failed to start instance: %s" % err)
486     return False
487
488   return True
489
490
491 def ShutdownInstance(instance):
492   """Shut an instance down.
493
494   Args:
495     instance - name of instance to shutdown.
496
497   """
498   running_instances = GetInstanceList()
499
500   if instance.name not in running_instances:
501     return True
502
503   hyper = hypervisor.GetHypervisor()
504   try:
505     hyper.StopInstance(instance)
506   except errors.HypervisorError, err:
507     logger.Error("Failed to stop instance: %s" % err)
508     return False
509
510   # test every 10secs for 2min
511   shutdown_ok = False
512
513   time.sleep(1)
514   for dummy in range(11):
515     if instance.name not in GetInstanceList():
516       break
517     time.sleep(10)
518   else:
519     # the shutdown did not succeed
520     logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance)
521
522     try:
523       hyper.StopInstance(instance, force=True)
524     except errors.HypervisorError, err:
525       logger.Error("Failed to stop instance: %s" % err)
526       return False
527
528     time.sleep(1)
529     if instance.name in GetInstanceList():
530       logger.Error("could not shutdown instance '%s' even by destroy")
531       return False
532
533   return True
534
535
536 def CreateBlockDevice(disk, size, on_primary, info):
537   """Creates a block device for an instance.
538
539   Args:
540    bdev: a ganeti.objects.Disk object
541    size: the size of the physical underlying devices
542    do_open: if the device should be `Assemble()`-d and
543             `Open()`-ed after creation
544
545   Returns:
546     the new unique_id of the device (this can sometime be
547     computed only after creation), or None. On secondary nodes,
548     it's not required to return anything.
549
550   """
551   clist = []
552   if disk.children:
553     for child in disk.children:
554       crdev = _RecursiveAssembleBD(child, on_primary)
555       if on_primary or disk.AssembleOnSecondary():
556         # we need the children open in case the device itself has to
557         # be assembled
558         crdev.Open()
559       else:
560         crdev.Close()
561       clist.append(crdev)
562   try:
563     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
564     if device is not None:
565       logger.Info("removing existing device %s" % disk)
566       device.Remove()
567   except errors.BlockDeviceError, err:
568     pass
569
570   device = bdev.Create(disk.dev_type, disk.physical_id,
571                        clist, size)
572   if device is None:
573     raise ValueError("Can't create child device for %s, %s" %
574                      (disk, size))
575   if on_primary or disk.AssembleOnSecondary():
576     device.Assemble()
577     device.SetSyncSpeed(constants.SYNC_SPEED)
578     if on_primary or disk.OpenOnSecondary():
579       device.Open(force=True)
580
581   device.SetInfo(info)
582
583   physical_id = device.unique_id
584   return physical_id
585
586
587 def RemoveBlockDevice(disk):
588   """Remove a block device.
589
590   This is intended to be called recursively.
591
592   """
593   try:
594     # since we are removing the device, allow a partial match
595     # this allows removal of broken mirrors
596     rdev = _RecursiveFindBD(disk, allow_partial=True)
597   except errors.BlockDeviceError, err:
598     # probably can't attach
599     logger.Info("Can't attach to device %s in remove" % disk)
600     rdev = None
601   if rdev is not None:
602     result = rdev.Remove()
603   else:
604     result = True
605   if disk.children:
606     for child in disk.children:
607       result = result and RemoveBlockDevice(child)
608   return result
609
610
611 def _RecursiveAssembleBD(disk, as_primary):
612   """Activate a block device for an instance.
613
614   This is run on the primary and secondary nodes for an instance.
615
616   This function is called recursively.
617
618   Args:
619     disk: a objects.Disk object
620     as_primary: if we should make the block device read/write
621
622   Returns:
623     the assembled device or None (in case no device was assembled)
624
625   If the assembly is not successful, an exception is raised.
626
627   """
628   children = []
629   if disk.children:
630     for chld_disk in disk.children:
631       children.append(_RecursiveAssembleBD(chld_disk, as_primary))
632
633   if as_primary or disk.AssembleOnSecondary():
634     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
635     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
636     result = r_dev
637     if as_primary or disk.OpenOnSecondary():
638       r_dev.Open()
639     else:
640       r_dev.Close()
641   else:
642     result = True
643   return result
644
645
646 def AssembleBlockDevice(disk, as_primary):
647   """Activate a block device for an instance.
648
649   This is a wrapper over _RecursiveAssembleBD.
650
651   Returns:
652     a /dev path for primary nodes
653     True for secondary nodes
654
655   """
656   result = _RecursiveAssembleBD(disk, as_primary)
657   if isinstance(result, bdev.BlockDev):
658     result = result.dev_path
659   return result
660
661
662 def ShutdownBlockDevice(disk):
663   """Shut down a block device.
664
665   First, if the device is assembled (can `Attach()`), then the device
666   is shutdown. Then the children of the device are shutdown.
667
668   This function is called recursively. Note that we don't cache the
669   children or such, as oppossed to assemble, shutdown of different
670   devices doesn't require that the upper device was active.
671
672   """
673   r_dev = _RecursiveFindBD(disk)
674   if r_dev is not None:
675     result = r_dev.Shutdown()
676   else:
677     result = True
678   if disk.children:
679     for child in disk.children:
680       result = result and ShutdownBlockDevice(child)
681   return result
682
683
684 def MirrorAddChild(md_cdev, new_cdev):
685   """Extend an MD raid1 array.
686
687   """
688   md_bdev = _RecursiveFindBD(md_cdev, allow_partial=True)
689   if md_bdev is None:
690     logger.Error("Can't find md device")
691     return False
692   new_bdev = _RecursiveFindBD(new_cdev)
693   if new_bdev is None:
694     logger.Error("Can't find new device to add")
695     return False
696   new_bdev.Open()
697   md_bdev.AddChild(new_bdev)
698   return True
699
700
701 def MirrorRemoveChild(md_cdev, new_cdev):
702   """Reduce an MD raid1 array.
703
704   """
705   md_bdev = _RecursiveFindBD(md_cdev)
706   if md_bdev is None:
707     return False
708   new_bdev = _RecursiveFindBD(new_cdev)
709   if new_bdev is None:
710     return False
711   new_bdev.Open()
712   md_bdev.RemoveChild(new_bdev.dev_path)
713   return True
714
715
716 def GetMirrorStatus(disks):
717   """Get the mirroring status of a list of devices.
718
719   Args:
720     disks: list of `objects.Disk`
721
722   Returns:
723     list of (mirror_done, estimated_time) tuples, which
724     are the result of bdev.BlockDevice.CombinedSyncStatus()
725
726   """
727   stats = []
728   for dsk in disks:
729     rbd = _RecursiveFindBD(dsk)
730     if rbd is None:
731       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
732     stats.append(rbd.CombinedSyncStatus())
733   return stats
734
735
736 def _RecursiveFindBD(disk, allow_partial=False):
737   """Check if a device is activated.
738
739   If so, return informations about the real device.
740
741   Args:
742     disk: the objects.Disk instance
743     allow_partial: don't abort the find if a child of the
744                    device can't be found; this is intended to be
745                    used when repairing mirrors
746
747   Returns:
748     None if the device can't be found
749     otherwise the device instance
750
751   """
752   children = []
753   if disk.children:
754     for chdisk in disk.children:
755       children.append(_RecursiveFindBD(chdisk))
756
757   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
758
759
760 def FindBlockDevice(disk):
761   """Check if a device is activated.
762
763   If so, return informations about the real device.
764
765   Args:
766     disk: the objects.Disk instance
767   Returns:
768     None if the device can't be found
769     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
770
771   """
772   rbd = _RecursiveFindBD(disk)
773   if rbd is None:
774     return rbd
775   sync_p, est_t, is_degr = rbd.GetSyncStatus()
776   return rbd.dev_path, rbd.major, rbd.minor, sync_p, est_t, is_degr
777
778
779 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
780   """Write a file to the filesystem.
781
782   This allows the master to overwrite(!) a file. It will only perform
783   the operation if the file belongs to a list of configuration files.
784
785   """
786   if not os.path.isabs(file_name):
787     logger.Error("Filename passed to UploadFile is not absolute: '%s'" %
788                  file_name)
789     return False
790
791   allowed_files = [constants.CLUSTER_CONF_FILE, "/etc/hosts",
792                    constants.SSH_KNOWN_HOSTS_FILE]
793   allowed_files.extend(ssconf.SimpleStore().GetFileList())
794   if file_name not in allowed_files:
795     logger.Error("Filename passed to UploadFile not in allowed"
796                  " upload targets: '%s'" % file_name)
797     return False
798
799   dir_name, small_name = os.path.split(file_name)
800   fd, new_name = tempfile.mkstemp('.new', small_name, dir_name)
801   # here we need to make sure we remove the temp file, if any error
802   # leaves it in place
803   try:
804     os.chown(new_name, uid, gid)
805     os.chmod(new_name, mode)
806     os.write(fd, data)
807     os.fsync(fd)
808     os.utime(new_name, (atime, mtime))
809     os.rename(new_name, file_name)
810   finally:
811     os.close(fd)
812     utils.RemoveFile(new_name)
813   return True
814
815 def _ErrnoOrStr(err):
816   """Format an EnvironmentError exception.
817
818   If the `err` argument has an errno attribute, it will be looked up
819   and converted into a textual EXXXX description. Otherwise the string
820   representation of the error will be returned.
821
822   """
823   if hasattr(err, 'errno'):
824     detail = errno.errorcode[err.errno]
825   else:
826     detail = str(err)
827   return detail
828
829
830 def _OSOndiskVersion(name, os_dir=None):
831   """Compute and return the api version of a given OS.
832
833   This function will try to read the api version of the os given by
834   the 'name' parameter. By default, it wil use the constants.OS_DIR
835   as top-level directory for OSes, but this can be overriden by the
836   use of the os_dir parameter. Return value will be either an
837   integer denoting the version or None in the case when this is not
838   a valid OS name.
839
840   """
841   if os_dir is None:
842     os_dir = os.path.sep.join([constants.OS_DIR, name])
843
844   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
845
846   try:
847     st = os.stat(api_file)
848   except EnvironmentError, err:
849     raise errors.InvalidOS(name, "'ganeti_api_version' file not"
850                            " found (%s)" % _ErrnoOrStr(err))
851
852   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
853     raise errors.InvalidOS(name, "'ganeti_api_version' file is not"
854                            " a regular file")
855
856   try:
857     f = open(api_file)
858     try:
859       api_version = f.read(256)
860     finally:
861       f.close()
862   except EnvironmentError, err:
863     raise errors.InvalidOS(name, "error while reading the"
864                            " API version (%s)" % _ErrnoOrStr(err))
865
866   api_version = api_version.strip()
867   try:
868     api_version = int(api_version)
869   except (TypeError, ValueError), err:
870     raise errors.InvalidOS(name, "API version is not integer (%s)" % str(err))
871
872   return api_version
873
874 def DiagnoseOS(top_dir=None):
875   """Compute the validity for all OSes.
876
877   For each name in the give top_dir parameter (if not given, defaults
878   to constants.OS_DIR), it will return an object. If this is a valid
879   os, the object will be an instance of the object.OS class. If not,
880   it will be an instance of errors.InvalidOS and this signifies that
881   this name does not correspond to a valid OS.
882
883   Returns:
884     list of objects
885
886   """
887   if top_dir is None:
888     top_dir = constants.OS_DIR
889
890   try:
891     f_names = os.listdir(top_dir)
892   except EnvironmentError, err:
893     logger.Error("Can't list the OS directory: %s" % str(err))
894     return False
895   result = []
896   for name in f_names:
897     try:
898       os_inst = OSFromDisk(name, os.path.sep.join([top_dir, name]))
899       result.append(os_inst)
900     except errors.InvalidOS, err:
901       result.append(err)
902
903   return result
904
905
906 def OSFromDisk(name, os_dir=None):
907   """Create an OS instance from disk.
908
909   This function will return an OS instance if the given name is a
910   valid OS name. Otherwise, it will raise an appropriate
911   `errors.InvalidOS` exception, detailing why this is not a valid
912   OS.
913
914   """
915   if os_dir is None:
916     os_dir = os.path.sep.join([constants.OS_DIR, name])
917
918   api_version = _OSOndiskVersion(name, os_dir)
919
920   if api_version != constants.OS_API_VERSION:
921     raise errors.InvalidOS(name, "API version mismatch (found %s want %s)"
922                            % (api_version, constants.OS_API_VERSION))
923
924   # OS Scripts dictionary, we will populate it with the actual script names
925   os_scripts = {'create': '', 'export': '', 'import': ''}
926
927   for script in os_scripts:
928     os_scripts[script] = os.path.sep.join([os_dir, script])
929
930     try:
931       st = os.stat(os_scripts[script])
932     except EnvironmentError, err:
933       raise errors.InvalidOS(name, "'%s' script missing (%s)" %
934                              (script, _ErrnoOrStr(err)))
935
936     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
937       raise errors.InvalidOS(name, "'%s' script not executable" % script)
938
939     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
940       raise errors.InvalidOS(name, "'%s' is not a regular file" % script)
941
942
943   return objects.OS(name=name, path=os_dir,
944                     create_script=os_scripts['create'],
945                     export_script=os_scripts['export'],
946                     import_script=os_scripts['import'],
947                     api_version=api_version)
948
949
950 def SnapshotBlockDevice(disk):
951   """Create a snapshot copy of a block device.
952
953   This function is called recursively, and the snapshot is actually created
954   just for the leaf lvm backend device.
955
956   Args:
957     disk: the disk to be snapshotted
958
959   Returns:
960     a config entry for the actual lvm device snapshotted.
961
962   """
963   if disk.children:
964     if len(disk.children) == 1:
965       # only one child, let's recurse on it
966       return SnapshotBlockDevice(disk.children[0])
967     else:
968       # more than one child, choose one that matches
969       for child in disk.children:
970         if child.size == disk.size:
971           # return implies breaking the loop
972           return SnapshotBlockDevice(child)
973   elif disk.dev_type == "lvm":
974     r_dev = _RecursiveFindBD(disk)
975     if r_dev is not None:
976       # let's stay on the safe side and ask for the full size, for now
977       return r_dev.Snapshot(disk.size)
978     else:
979       return None
980   else:
981     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
982                                  "'%s' of type '%s'" %
983                                  (disk.unique_id, disk.dev_type))
984
985
986 def ExportSnapshot(disk, dest_node, instance):
987   """Export a block device snapshot to a remote node.
988
989   Args:
990     disk: the snapshot block device
991     dest_node: the node to send the image to
992     instance: instance being exported
993
994   Returns:
995     True if successful, False otherwise.
996
997   """
998   inst_os = OSFromDisk(instance.os)
999   export_script = inst_os.export_script
1000
1001   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1002                                      instance.name, int(time.time()))
1003   if not os.path.exists(constants.LOG_OS_DIR):
1004     os.mkdir(constants.LOG_OS_DIR, 0750)
1005
1006   real_os_dev = _RecursiveFindBD(disk)
1007   if real_os_dev is None:
1008     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1009                                   str(disk))
1010   real_os_dev.Open()
1011
1012   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1013   destfile = disk.physical_id[1]
1014
1015   # the target command is built out of three individual commands,
1016   # which are joined by pipes; we check each individual command for
1017   # valid parameters
1018
1019   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1020                                export_script, instance.name,
1021                                real_os_dev.dev_path, logfile)
1022
1023   comprcmd = "gzip"
1024
1025   destcmd = utils.BuildShellCmd("mkdir -p %s; cat > %s/%s", 
1026                                 destdir, destdir, destfile)
1027   remotecmd = ssh.BuildSSHCmd(dest_node, 'root', destcmd)
1028   
1029   
1030
1031   # all commands have been checked, so we're safe to combine them
1032   command = '|'.join([expcmd, comprcmd, ' '.join(remotecmd)])
1033
1034   result = utils.RunCmd(command)
1035
1036   if result.failed:
1037     logger.Error("os snapshot export command '%s' returned error: %s"
1038                  " output: %s" %
1039                  (command, result.fail_reason, result.output))
1040     return False
1041
1042   return True
1043
1044
1045 def FinalizeExport(instance, snap_disks):
1046   """Write out the export configuration information.
1047
1048   Args:
1049     instance: instance configuration
1050     snap_disks: snapshot block devices
1051
1052   Returns:
1053     False in case of error, True otherwise.
1054
1055   """
1056   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1057   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1058
1059   config = objects.SerializableConfigParser()
1060
1061   config.add_section(constants.INISECT_EXP)
1062   config.set(constants.INISECT_EXP, 'version', '0')
1063   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1064   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1065   config.set(constants.INISECT_EXP, 'os', instance.os)
1066   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1067
1068   config.add_section(constants.INISECT_INS)
1069   config.set(constants.INISECT_INS, 'name', instance.name)
1070   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1071   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1072   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1073   for nic_count, nic in enumerate(instance.nics):
1074     config.set(constants.INISECT_INS, 'nic%d_mac' %
1075                nic_count, '%s' % nic.mac)
1076     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1077   # TODO: redundant: on load can read nics until it doesn't exist
1078   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1079
1080   for disk_count, disk in enumerate(snap_disks):
1081     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1082                ('%s' % disk.iv_name))
1083     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1084                ('%s' % disk.physical_id[1]))
1085     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1086                ('%d' % disk.size))
1087   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1088
1089   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1090   cfo = open(cff, 'w')
1091   try:
1092     config.write(cfo)
1093   finally:
1094     cfo.close()
1095
1096   shutil.rmtree(finaldestdir, True)
1097   shutil.move(destdir, finaldestdir)
1098
1099   return True
1100
1101
1102 def ExportInfo(dest):
1103   """Get export configuration information.
1104
1105   Args:
1106     dest: directory containing the export
1107
1108   Returns:
1109     A serializable config file containing the export info.
1110
1111   """
1112   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1113
1114   config = objects.SerializableConfigParser()
1115   config.read(cff)
1116
1117   if (not config.has_section(constants.INISECT_EXP) or
1118       not config.has_section(constants.INISECT_INS)):
1119     return None
1120
1121   return config
1122
1123
1124 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1125   """Import an os image into an instance.
1126
1127   Args:
1128     instance: the instance object
1129     os_disk: the instance-visible name of the os device
1130     swap_disk: the instance-visible name of the swap device
1131     src_node: node holding the source image
1132     src_image: path to the source image on src_node
1133
1134   Returns:
1135     False in case of error, True otherwise.
1136
1137   """
1138   inst_os = OSFromDisk(instance.os)
1139   import_script = inst_os.import_script
1140
1141   for os_device in instance.disks:
1142     if os_device.iv_name == os_disk:
1143       break
1144   else:
1145     logger.Error("Can't find this device-visible name '%s'" % os_disk)
1146     return False
1147
1148   for swap_device in instance.disks:
1149     if swap_device.iv_name == swap_disk:
1150       break
1151   else:
1152     logger.Error("Can't find this device-visible name '%s'" % swap_disk)
1153     return False
1154
1155   real_os_dev = _RecursiveFindBD(os_device)
1156   if real_os_dev is None:
1157     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1158                                   str(os_device))
1159   real_os_dev.Open()
1160
1161   real_swap_dev = _RecursiveFindBD(swap_device)
1162   if real_swap_dev is None:
1163     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1164                                   str(swap_device))
1165   real_swap_dev.Open()
1166
1167   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1168                                         instance.name, int(time.time()))
1169   if not os.path.exists(constants.LOG_OS_DIR):
1170     os.mkdir(constants.LOG_OS_DIR, 0750)
1171
1172   destcmd = utils.BuildShellCmd('cat %s', src_image)
1173   remotecmd = ssh.BuildSSHCmd(src_node, 'root', destcmd)
1174
1175   comprcmd = "gunzip"
1176   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1177                                inst_os.path, import_script, instance.name,
1178                                real_os_dev.dev_path, real_swap_dev.dev_path,
1179                                logfile)
1180
1181   command = '|'.join([' '.join(remotecmd), comprcmd, impcmd])
1182
1183   result = utils.RunCmd(command)
1184
1185   if result.failed:
1186     logger.Error("os import command '%s' returned error: %s"
1187                  " output: %s" %
1188                  (command, result.fail_reason, result.output))
1189     return False
1190
1191   return True
1192
1193
1194 def ListExports():
1195   """Return a list of exports currently available on this machine.
1196
1197   """
1198   if os.path.isdir(constants.EXPORT_DIR):
1199     return os.listdir(constants.EXPORT_DIR)
1200   else:
1201     return []
1202
1203
1204 def RemoveExport(export):
1205   """Remove an existing export from the node.
1206
1207   Args:
1208     export: the name of the export to remove
1209
1210   Returns:
1211     False in case of error, True otherwise.
1212
1213   """
1214   target = os.path.join(constants.EXPORT_DIR, export)
1215
1216   shutil.rmtree(target)
1217   # TODO: catch some of the relevant exceptions and provide a pretty
1218   # error message if rmtree fails.
1219
1220   return True
1221
1222
1223 class HooksRunner(object):
1224   """Hook runner.
1225
1226   This class is instantiated on the node side (ganeti-noded) and not on
1227   the master side.
1228
1229   """
1230   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1231
1232   def __init__(self, hooks_base_dir=None):
1233     """Constructor for hooks runner.
1234
1235     Args:
1236       - hooks_base_dir: if not None, this overrides the
1237         constants.HOOKS_BASE_DIR (useful for unittests)
1238       - logs_base_dir: if not None, this overrides the
1239         constants.LOG_HOOKS_DIR (useful for unittests)
1240       - logging: enable or disable logging of script output
1241
1242     """
1243     if hooks_base_dir is None:
1244       hooks_base_dir = constants.HOOKS_BASE_DIR
1245     self._BASE_DIR = hooks_base_dir
1246
1247   @staticmethod
1248   def ExecHook(script, env):
1249     """Exec one hook script.
1250
1251     Args:
1252      - phase: the phase
1253      - script: the full path to the script
1254      - env: the environment with which to exec the script
1255
1256     """
1257     # exec the process using subprocess and log the output
1258     fdstdin = None
1259     try:
1260       fdstdin = open("/dev/null", "r")
1261       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1262                                stderr=subprocess.STDOUT, close_fds=True,
1263                                shell=False, cwd="/",env=env)
1264       output = ""
1265       try:
1266         output = child.stdout.read(4096)
1267         child.stdout.close()
1268       except EnvironmentError, err:
1269         output += "Hook script error: %s" % str(err)
1270
1271       while True:
1272         try:
1273           result = child.wait()
1274           break
1275         except EnvironmentError, err:
1276           if err.errno == errno.EINTR:
1277             continue
1278           raise
1279     finally:
1280       # try not to leak fds
1281       for fd in (fdstdin, ):
1282         if fd is not None:
1283           try:
1284             fd.close()
1285           except EnvironmentError, err:
1286             # just log the error
1287             #logger.Error("While closing fd %s: %s" % (fd, err))
1288             pass
1289
1290     return result == 0, output
1291
1292   def RunHooks(self, hpath, phase, env):
1293     """Run the scripts in the hooks directory.
1294
1295     This method will not be usually overriden by child opcodes.
1296
1297     """
1298     if phase == constants.HOOKS_PHASE_PRE:
1299       suffix = "pre"
1300     elif phase == constants.HOOKS_PHASE_POST:
1301       suffix = "post"
1302     else:
1303       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1304     rr = []
1305
1306     subdir = "%s-%s.d" % (hpath, suffix)
1307     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1308     try:
1309       dir_contents = os.listdir(dir_name)
1310     except OSError, err:
1311       # must log
1312       return rr
1313
1314     # we use the standard python sort order,
1315     # so 00name is the recommended naming scheme
1316     dir_contents.sort()
1317     for relname in dir_contents:
1318       fname = os.path.join(dir_name, relname)
1319       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1320           self.RE_MASK.match(relname) is not None):
1321         rrval = constants.HKR_SKIP
1322         output = ""
1323       else:
1324         result, output = self.ExecHook(fname, env)
1325         if not result:
1326           rrval = constants.HKR_FAIL
1327         else:
1328           rrval = constants.HKR_SUCCESS
1329       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1330
1331     return rr