Add KVM hypervisor code
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetSshRunner():
48   return ssh.SshRunner()
49
50
51 def _CleanDirectory(path, exclude=[]):
52   """Removes all regular files in a directory.
53
54   @param exclude: List of files to be excluded.
55   @type exclude: list
56
57   """
58   if not os.path.isdir(path):
59     return
60
61   # Normalize excluded paths
62   exclude = [os.path.normpath(i) for i in exclude]
63
64   for rel_name in utils.ListVisibleFiles(path):
65     full_name = os.path.normpath(os.path.join(path, rel_name))
66     if full_name in exclude:
67       continue
68     if os.path.isfile(full_name) and not os.path.islink(full_name):
69       utils.RemoveFile(full_name)
70
71
72 def _JobQueuePurge(keep_lock):
73   """Removes job queue files and archived jobs
74
75   """
76   if keep_lock:
77     exclude = [constants.JOB_QUEUE_LOCK_FILE]
78   else:
79     exclude = []
80
81   _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
82   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
83
84
85 def _GetMasterInfo():
86   """Return the master ip and netdev.
87
88   """
89   try:
90     ss = ssconf.SimpleStore()
91     master_netdev = ss.GetMasterNetdev()
92     master_ip = ss.GetMasterIP()
93   except errors.ConfigurationError, err:
94     logging.exception("Cluster configuration incomplete")
95     return (None, None)
96   return (master_netdev, master_ip)
97
98
99 def StartMaster(start_daemons):
100   """Activate local node as master node.
101
102   The function will always try activate the IP address of the master
103   (if someone else has it, then it won't). Then, if the start_daemons
104   parameter is True, it will also start the master daemons
105   (ganet-masterd and ganeti-rapi).
106
107   """
108   ok = True
109   master_netdev, master_ip = _GetMasterInfo()
110   if not master_netdev:
111     return False
112
113   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
114     if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
115                      source=constants.LOCALHOST_IP_ADDRESS):
116       # we already have the ip:
117       logging.debug("Already started")
118     else:
119       logging.error("Someone else has the master ip, not activating")
120       ok = False
121   else:
122     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
123                            "dev", master_netdev, "label",
124                            "%s:0" % master_netdev])
125     if result.failed:
126       logging.error("Can't activate master IP: %s", result.output)
127       ok = False
128
129     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
130                            "-s", master_ip, master_ip])
131     # we'll ignore the exit code of arping
132
133   # and now start the master and rapi daemons
134   if start_daemons:
135     for daemon in 'ganeti-masterd', 'ganeti-rapi':
136       result = utils.RunCmd([daemon])
137       if result.failed:
138         logging.error("Can't start daemon %s: %s", daemon, result.output)
139         ok = False
140   return ok
141
142
143 def StopMaster(stop_daemons):
144   """Deactivate this node as master.
145
146   The function will always try to deactivate the IP address of the
147   master. Then, if the stop_daemons parameter is True, it will also
148   stop the master daemons (ganet-masterd and ganeti-rapi).
149
150   """
151   master_netdev, master_ip = _GetMasterInfo()
152   if not master_netdev:
153     return False
154
155   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
156                          "dev", master_netdev])
157   if result.failed:
158     logging.error("Can't remove the master IP, error: %s", result.output)
159     # but otherwise ignore the failure
160
161   if stop_daemons:
162     # stop/kill the rapi and the master daemon
163     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
164       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
165
166   return True
167
168
169 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
170   """Joins this node to the cluster.
171
172   This does the following:
173       - updates the hostkeys of the machine (rsa and dsa)
174       - adds the ssh private key to the user
175       - adds the ssh public key to the users' authorized_keys file
176
177   """
178   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
179                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
180                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
181                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
182   for name, content, mode in sshd_keys:
183     utils.WriteFile(name, data=content, mode=mode)
184
185   try:
186     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
187                                                     mkdir=True)
188   except errors.OpExecError, err:
189     logging.exception("Error while processing user ssh files")
190     return False
191
192   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
193     utils.WriteFile(name, data=content, mode=0600)
194
195   utils.AddAuthorizedKey(auth_keys, sshpub)
196
197   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
198
199   return True
200
201
202 def LeaveCluster():
203   """Cleans up the current node and prepares it to be removed from the cluster.
204
205   """
206   _CleanDirectory(constants.DATA_DIR)
207
208   # The lock can be removed because we're going to quit anyway.
209   _JobQueuePurge(keep_lock=False)
210
211   try:
212     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
213   except errors.OpExecError:
214     logging.exception("Error while processing ssh files")
215     return
216
217   f = open(pub_key, 'r')
218   try:
219     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
220   finally:
221     f.close()
222
223   utils.RemoveFile(priv_key)
224   utils.RemoveFile(pub_key)
225
226   # Return a reassuring string to the caller, and quit
227   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
228
229
230 def GetNodeInfo(vgname):
231   """Gives back a hash with different informations about the node.
232
233   Returns:
234     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
235       'memory_free' : xxx, 'memory_total' : xxx }
236     where
237     vg_size is the size of the configured volume group in MiB
238     vg_free is the free size of the volume group in MiB
239     memory_dom0 is the memory allocated for domain0 in MiB
240     memory_free is the currently available (free) ram in MiB
241     memory_total is the total number of ram in MiB
242
243   """
244   outputarray = {}
245   vginfo = _GetVGInfo(vgname)
246   outputarray['vg_size'] = vginfo['vg_size']
247   outputarray['vg_free'] = vginfo['vg_free']
248
249   hyper = hypervisor.GetHypervisor()
250   hyp_info = hyper.GetNodeInfo()
251   if hyp_info is not None:
252     outputarray.update(hyp_info)
253
254   f = open("/proc/sys/kernel/random/boot_id", 'r')
255   try:
256     outputarray["bootid"] = f.read(128).rstrip("\n")
257   finally:
258     f.close()
259
260   return outputarray
261
262
263 def VerifyNode(what):
264   """Verify the status of the local node.
265
266   Args:
267     what - a dictionary of things to check:
268       'filelist' : list of files for which to compute checksums
269       'nodelist' : list of nodes we should check communication with
270       'hypervisor': run the hypervisor-specific verify
271
272   Requested files on local node are checksummed and the result returned.
273
274   The nodelist is traversed, with the following checks being made
275   for each node:
276   - known_hosts key correct
277   - correct resolving of node name (target node returns its own hostname
278     by ssh-execution of 'hostname', result compared against name in list.
279
280   """
281   result = {}
282
283   if 'hypervisor' in what:
284     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
285
286   if 'filelist' in what:
287     result['filelist'] = utils.FingerprintFiles(what['filelist'])
288
289   if 'nodelist' in what:
290     result['nodelist'] = {}
291     random.shuffle(what['nodelist'])
292     for node in what['nodelist']:
293       success, message = _GetSshRunner().VerifyNodeHostname(node)
294       if not success:
295         result['nodelist'][node] = message
296   if 'node-net-test' in what:
297     result['node-net-test'] = {}
298     my_name = utils.HostInfo().name
299     my_pip = my_sip = None
300     for name, pip, sip in what['node-net-test']:
301       if name == my_name:
302         my_pip = pip
303         my_sip = sip
304         break
305     if not my_pip:
306       result['node-net-test'][my_name] = ("Can't find my own"
307                                           " primary/secondary IP"
308                                           " in the node list")
309     else:
310       port = ssconf.SimpleStore().GetNodeDaemonPort()
311       for name, pip, sip in what['node-net-test']:
312         fail = []
313         if not utils.TcpPing(pip, port, source=my_pip):
314           fail.append("primary")
315         if sip != pip:
316           if not utils.TcpPing(sip, port, source=my_sip):
317             fail.append("secondary")
318         if fail:
319           result['node-net-test'][name] = ("failure using the %s"
320                                            " interface(s)" %
321                                            " and ".join(fail))
322
323   return result
324
325
326 def GetVolumeList(vg_name):
327   """Compute list of logical volumes and their size.
328
329   Returns:
330     dictionary of all partions (key) with their size (in MiB), inactive
331     and online status:
332     {'test1': ('20.06', True, True)}
333
334   """
335   lvs = {}
336   sep = '|'
337   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
338                          "--separator=%s" % sep,
339                          "-olv_name,lv_size,lv_attr", vg_name])
340   if result.failed:
341     logging.error("Failed to list logical volumes, lvs output: %s",
342                   result.output)
343     return result.output
344
345   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
346   for line in result.stdout.splitlines():
347     line = line.strip()
348     match = valid_line_re.match(line)
349     if not match:
350       logging.error("Invalid line returned from lvs output: '%s'", line)
351       continue
352     name, size, attr = match.groups()
353     inactive = attr[4] == '-'
354     online = attr[5] == 'o'
355     lvs[name] = (size, inactive, online)
356
357   return lvs
358
359
360 def ListVolumeGroups():
361   """List the volume groups and their size.
362
363   Returns:
364     Dictionary with keys volume name and values the size of the volume
365
366   """
367   return utils.ListVolumeGroups()
368
369
370 def NodeVolumes():
371   """List all volumes on this node.
372
373   """
374   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
375                          "--separator=|",
376                          "--options=lv_name,lv_size,devices,vg_name"])
377   if result.failed:
378     logging.error("Failed to list logical volumes, lvs output: %s",
379                   result.output)
380     return {}
381
382   def parse_dev(dev):
383     if '(' in dev:
384       return dev.split('(')[0]
385     else:
386       return dev
387
388   def map_line(line):
389     return {
390       'name': line[0].strip(),
391       'size': line[1].strip(),
392       'dev': parse_dev(line[2].strip()),
393       'vg': line[3].strip(),
394     }
395
396   return [map_line(line.split('|')) for line in result.stdout.splitlines()
397           if line.count('|') >= 3]
398
399
400 def BridgesExist(bridges_list):
401   """Check if a list of bridges exist on the current node.
402
403   Returns:
404     True if all of them exist, false otherwise
405
406   """
407   for bridge in bridges_list:
408     if not utils.BridgeExists(bridge):
409       return False
410
411   return True
412
413
414 def GetInstanceList():
415   """Provides a list of instances.
416
417   Returns:
418     A list of all running instances on the current node
419     - instance1.example.com
420     - instance2.example.com
421
422   """
423   try:
424     names = hypervisor.GetHypervisor().ListInstances()
425   except errors.HypervisorError, err:
426     logging.exception("Error enumerating instances")
427     raise
428
429   return names
430
431
432 def GetInstanceInfo(instance):
433   """Gives back the informations about an instance as a dictionary.
434
435   Args:
436     instance: name of the instance (ex. instance1.example.com)
437
438   Returns:
439     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
440     where
441     memory: memory size of instance (int)
442     state: xen state of instance (string)
443     time: cpu time of instance (float)
444
445   """
446   output = {}
447
448   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
449   if iinfo is not None:
450     output['memory'] = iinfo[2]
451     output['state'] = iinfo[4]
452     output['time'] = iinfo[5]
453
454   return output
455
456
457 def GetAllInstancesInfo():
458   """Gather data about all instances.
459
460   This is the equivalent of `GetInstanceInfo()`, except that it
461   computes data for all instances at once, thus being faster if one
462   needs data about more than one instance.
463
464   Returns: a dictionary of dictionaries, keys being the instance name,
465     and with values:
466     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
467     where
468     memory: memory size of instance (int)
469     state: xen state of instance (string)
470     time: cpu time of instance (float)
471     vcpus: the number of cpus
472
473   """
474   output = {}
475
476   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
477   if iinfo:
478     for name, inst_id, memory, vcpus, state, times in iinfo:
479       output[name] = {
480         'memory': memory,
481         'vcpus': vcpus,
482         'state': state,
483         'time': times,
484         }
485
486   return output
487
488
489 def AddOSToInstance(instance, os_disk, swap_disk):
490   """Add an OS to an instance.
491
492   Args:
493     instance: the instance object
494     os_disk: the instance-visible name of the os device
495     swap_disk: the instance-visible name of the swap device
496
497   """
498   inst_os = OSFromDisk(instance.os)
499
500   create_script = inst_os.create_script
501
502   os_device = instance.FindDisk(os_disk)
503   if os_device is None:
504     logging.error("Can't find this device-visible name '%s'", os_disk)
505     return False
506
507   swap_device = instance.FindDisk(swap_disk)
508   if swap_device is None:
509     logging.error("Can't find this device-visible name '%s'", swap_disk)
510     return False
511
512   real_os_dev = _RecursiveFindBD(os_device)
513   if real_os_dev is None:
514     raise errors.BlockDeviceError("Block device '%s' is not set up" %
515                                   str(os_device))
516   real_os_dev.Open()
517
518   real_swap_dev = _RecursiveFindBD(swap_device)
519   if real_swap_dev is None:
520     raise errors.BlockDeviceError("Block device '%s' is not set up" %
521                                   str(swap_device))
522   real_swap_dev.Open()
523
524   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
525                                      instance.name, int(time.time()))
526   if not os.path.exists(constants.LOG_OS_DIR):
527     os.mkdir(constants.LOG_OS_DIR, 0750)
528
529   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
530                                 inst_os.path, create_script, instance.name,
531                                 real_os_dev.dev_path, real_swap_dev.dev_path,
532                                 logfile)
533
534   result = utils.RunCmd(command)
535   if result.failed:
536     logging.error("os create command '%s' returned error: %s, logfile: %s,"
537                   " output: %s", command, result.fail_reason, logfile,
538                   result.output)
539     return False
540
541   return True
542
543
544 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
545   """Run the OS rename script for an instance.
546
547   Args:
548     instance: the instance object
549     old_name: the old name of the instance
550     os_disk: the instance-visible name of the os device
551     swap_disk: the instance-visible name of the swap device
552
553   """
554   inst_os = OSFromDisk(instance.os)
555
556   script = inst_os.rename_script
557
558   os_device = instance.FindDisk(os_disk)
559   if os_device is None:
560     logging.error("Can't find this device-visible name '%s'", os_disk)
561     return False
562
563   swap_device = instance.FindDisk(swap_disk)
564   if swap_device is None:
565     logging.error("Can't find this device-visible name '%s'", swap_disk)
566     return False
567
568   real_os_dev = _RecursiveFindBD(os_device)
569   if real_os_dev is None:
570     raise errors.BlockDeviceError("Block device '%s' is not set up" %
571                                   str(os_device))
572   real_os_dev.Open()
573
574   real_swap_dev = _RecursiveFindBD(swap_device)
575   if real_swap_dev is None:
576     raise errors.BlockDeviceError("Block device '%s' is not set up" %
577                                   str(swap_device))
578   real_swap_dev.Open()
579
580   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
581                                            old_name,
582                                            instance.name, int(time.time()))
583   if not os.path.exists(constants.LOG_OS_DIR):
584     os.mkdir(constants.LOG_OS_DIR, 0750)
585
586   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
587                                 inst_os.path, script, old_name, instance.name,
588                                 real_os_dev.dev_path, real_swap_dev.dev_path,
589                                 logfile)
590
591   result = utils.RunCmd(command)
592
593   if result.failed:
594     logging.error("os create command '%s' returned error: %s output: %s",
595                   command, result.fail_reason, result.output)
596     return False
597
598   return True
599
600
601 def _GetVGInfo(vg_name):
602   """Get informations about the volume group.
603
604   Args:
605     vg_name: the volume group
606
607   Returns:
608     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
609     where
610     vg_size is the total size of the volume group in MiB
611     vg_free is the free size of the volume group in MiB
612     pv_count are the number of physical disks in that vg
613
614   If an error occurs during gathering of data, we return the same dict
615   with keys all set to None.
616
617   """
618   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
619
620   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
621                          "--nosuffix", "--units=m", "--separator=:", vg_name])
622
623   if retval.failed:
624     logging.error("volume group %s not present", vg_name)
625     return retdic
626   valarr = retval.stdout.strip().rstrip(':').split(':')
627   if len(valarr) == 3:
628     try:
629       retdic = {
630         "vg_size": int(round(float(valarr[0]), 0)),
631         "vg_free": int(round(float(valarr[1]), 0)),
632         "pv_count": int(valarr[2]),
633         }
634     except ValueError, err:
635       logging.exception("Fail to parse vgs output")
636   else:
637     logging.error("vgs output has the wrong number of fields (expected"
638                   " three): %s", str(valarr))
639   return retdic
640
641
642 def _GatherBlockDevs(instance):
643   """Set up an instance's block device(s).
644
645   This is run on the primary node at instance startup. The block
646   devices must be already assembled.
647
648   """
649   block_devices = []
650   for disk in instance.disks:
651     device = _RecursiveFindBD(disk)
652     if device is None:
653       raise errors.BlockDeviceError("Block device '%s' is not set up." %
654                                     str(disk))
655     device.Open()
656     block_devices.append((disk, device))
657   return block_devices
658
659
660 def StartInstance(instance, extra_args):
661   """Start an instance.
662
663   Args:
664     instance - name of instance to start.
665
666   """
667   running_instances = GetInstanceList()
668
669   if instance.name in running_instances:
670     return True
671
672   block_devices = _GatherBlockDevs(instance)
673   hyper = hypervisor.GetHypervisor()
674
675   try:
676     hyper.StartInstance(instance, block_devices, extra_args)
677   except errors.HypervisorError, err:
678     logging.exception("Failed to start instance")
679     return False
680
681   return True
682
683
684 def ShutdownInstance(instance):
685   """Shut an instance down.
686
687   Args:
688     instance - name of instance to shutdown.
689
690   """
691   running_instances = GetInstanceList()
692
693   if instance.name not in running_instances:
694     return True
695
696   hyper = hypervisor.GetHypervisor()
697   try:
698     hyper.StopInstance(instance)
699   except errors.HypervisorError, err:
700     logging.error("Failed to stop instance")
701     return False
702
703   # test every 10secs for 2min
704   shutdown_ok = False
705
706   time.sleep(1)
707   for dummy in range(11):
708     if instance.name not in GetInstanceList():
709       break
710     time.sleep(10)
711   else:
712     # the shutdown did not succeed
713     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
714
715     try:
716       hyper.StopInstance(instance, force=True)
717     except errors.HypervisorError, err:
718       logging.exception("Failed to stop instance")
719       return False
720
721     time.sleep(1)
722     if instance.name in GetInstanceList():
723       logging.error("could not shutdown instance '%s' even by destroy",
724                     instance.name)
725       return False
726
727   return True
728
729
730 def RebootInstance(instance, reboot_type, extra_args):
731   """Reboot an instance.
732
733   Args:
734     instance    - name of instance to reboot
735     reboot_type - how to reboot [soft,hard,full]
736
737   """
738   running_instances = GetInstanceList()
739
740   if instance.name not in running_instances:
741     logging.error("Cannot reboot instance that is not running")
742     return False
743
744   hyper = hypervisor.GetHypervisor()
745   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
746     try:
747       hyper.RebootInstance(instance)
748     except errors.HypervisorError, err:
749       logging.exception("Failed to soft reboot instance")
750       return False
751   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
752     try:
753       ShutdownInstance(instance)
754       StartInstance(instance, extra_args)
755     except errors.HypervisorError, err:
756       logging.exception("Failed to hard reboot instance")
757       return False
758   else:
759     raise errors.ParameterError("reboot_type invalid")
760
761
762   return True
763
764
765 def MigrateInstance(instance, target, live):
766   """Migrates an instance to another node.
767
768   """
769   hyper = hypervisor.GetHypervisor()
770
771   try:
772     hyper.MigrateInstance(instance, target, live)
773   except errors.HypervisorError, err:
774     msg = "Failed to migrate instance: %s" % str(err)
775     logging.error(msg)
776     return (False, msg)
777   return (True, "Migration successfull")
778
779
780 def CreateBlockDevice(disk, size, owner, on_primary, info):
781   """Creates a block device for an instance.
782
783   Args:
784    disk: a ganeti.objects.Disk object
785    size: the size of the physical underlying device
786    owner: a string with the name of the instance
787    on_primary: a boolean indicating if it is the primary node or not
788    info: string that will be sent to the physical device creation
789
790   Returns:
791     the new unique_id of the device (this can sometime be
792     computed only after creation), or None. On secondary nodes,
793     it's not required to return anything.
794
795   """
796   clist = []
797   if disk.children:
798     for child in disk.children:
799       crdev = _RecursiveAssembleBD(child, owner, on_primary)
800       if on_primary or disk.AssembleOnSecondary():
801         # we need the children open in case the device itself has to
802         # be assembled
803         crdev.Open()
804       clist.append(crdev)
805   try:
806     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
807     if device is not None:
808       logging.info("removing existing device %s", disk)
809       device.Remove()
810   except errors.BlockDeviceError, err:
811     pass
812
813   device = bdev.Create(disk.dev_type, disk.physical_id,
814                        clist, size)
815   if device is None:
816     raise ValueError("Can't create child device for %s, %s" %
817                      (disk, size))
818   if on_primary or disk.AssembleOnSecondary():
819     if not device.Assemble():
820       errorstring = "Can't assemble device after creation"
821       logging.error(errorstring)
822       raise errors.BlockDeviceError("%s, very unusual event - check the node"
823                                     " daemon logs" % errorstring)
824     device.SetSyncSpeed(constants.SYNC_SPEED)
825     if on_primary or disk.OpenOnSecondary():
826       device.Open(force=True)
827     DevCacheManager.UpdateCache(device.dev_path, owner,
828                                 on_primary, disk.iv_name)
829
830   device.SetInfo(info)
831
832   physical_id = device.unique_id
833   return physical_id
834
835
836 def RemoveBlockDevice(disk):
837   """Remove a block device.
838
839   This is intended to be called recursively.
840
841   """
842   try:
843     # since we are removing the device, allow a partial match
844     # this allows removal of broken mirrors
845     rdev = _RecursiveFindBD(disk, allow_partial=True)
846   except errors.BlockDeviceError, err:
847     # probably can't attach
848     logging.info("Can't attach to device %s in remove", disk)
849     rdev = None
850   if rdev is not None:
851     r_path = rdev.dev_path
852     result = rdev.Remove()
853     if result:
854       DevCacheManager.RemoveCache(r_path)
855   else:
856     result = True
857   if disk.children:
858     for child in disk.children:
859       result = result and RemoveBlockDevice(child)
860   return result
861
862
863 def _RecursiveAssembleBD(disk, owner, as_primary):
864   """Activate a block device for an instance.
865
866   This is run on the primary and secondary nodes for an instance.
867
868   This function is called recursively.
869
870   Args:
871     disk: a objects.Disk object
872     as_primary: if we should make the block device read/write
873
874   Returns:
875     the assembled device or None (in case no device was assembled)
876
877   If the assembly is not successful, an exception is raised.
878
879   """
880   children = []
881   if disk.children:
882     mcn = disk.ChildrenNeeded()
883     if mcn == -1:
884       mcn = 0 # max number of Nones allowed
885     else:
886       mcn = len(disk.children) - mcn # max number of Nones
887     for chld_disk in disk.children:
888       try:
889         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
890       except errors.BlockDeviceError, err:
891         if children.count(None) >= mcn:
892           raise
893         cdev = None
894         logging.debug("Error in child activation: %s", str(err))
895       children.append(cdev)
896
897   if as_primary or disk.AssembleOnSecondary():
898     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
899     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
900     result = r_dev
901     if as_primary or disk.OpenOnSecondary():
902       r_dev.Open()
903     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
904                                 as_primary, disk.iv_name)
905
906   else:
907     result = True
908   return result
909
910
911 def AssembleBlockDevice(disk, owner, as_primary):
912   """Activate a block device for an instance.
913
914   This is a wrapper over _RecursiveAssembleBD.
915
916   Returns:
917     a /dev path for primary nodes
918     True for secondary nodes
919
920   """
921   result = _RecursiveAssembleBD(disk, owner, as_primary)
922   if isinstance(result, bdev.BlockDev):
923     result = result.dev_path
924   return result
925
926
927 def ShutdownBlockDevice(disk):
928   """Shut down a block device.
929
930   First, if the device is assembled (can `Attach()`), then the device
931   is shutdown. Then the children of the device are shutdown.
932
933   This function is called recursively. Note that we don't cache the
934   children or such, as oppossed to assemble, shutdown of different
935   devices doesn't require that the upper device was active.
936
937   """
938   r_dev = _RecursiveFindBD(disk)
939   if r_dev is not None:
940     r_path = r_dev.dev_path
941     result = r_dev.Shutdown()
942     if result:
943       DevCacheManager.RemoveCache(r_path)
944   else:
945     result = True
946   if disk.children:
947     for child in disk.children:
948       result = result and ShutdownBlockDevice(child)
949   return result
950
951
952 def MirrorAddChildren(parent_cdev, new_cdevs):
953   """Extend a mirrored block device.
954
955   """
956   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
957   if parent_bdev is None:
958     logging.error("Can't find parent device")
959     return False
960   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
961   if new_bdevs.count(None) > 0:
962     logging.error("Can't find new device(s) to add: %s:%s",
963                   new_bdevs, new_cdevs)
964     return False
965   parent_bdev.AddChildren(new_bdevs)
966   return True
967
968
969 def MirrorRemoveChildren(parent_cdev, new_cdevs):
970   """Shrink a mirrored block device.
971
972   """
973   parent_bdev = _RecursiveFindBD(parent_cdev)
974   if parent_bdev is None:
975     logging.error("Can't find parent in remove children: %s", parent_cdev)
976     return False
977   devs = []
978   for disk in new_cdevs:
979     rpath = disk.StaticDevPath()
980     if rpath is None:
981       bd = _RecursiveFindBD(disk)
982       if bd is None:
983         logging.error("Can't find dynamic device %s while removing children",
984                       disk)
985         return False
986       else:
987         devs.append(bd.dev_path)
988     else:
989       devs.append(rpath)
990   parent_bdev.RemoveChildren(devs)
991   return True
992
993
994 def GetMirrorStatus(disks):
995   """Get the mirroring status of a list of devices.
996
997   Args:
998     disks: list of `objects.Disk`
999
1000   Returns:
1001     list of (mirror_done, estimated_time) tuples, which
1002     are the result of bdev.BlockDevice.CombinedSyncStatus()
1003
1004   """
1005   stats = []
1006   for dsk in disks:
1007     rbd = _RecursiveFindBD(dsk)
1008     if rbd is None:
1009       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1010     stats.append(rbd.CombinedSyncStatus())
1011   return stats
1012
1013
1014 def _RecursiveFindBD(disk, allow_partial=False):
1015   """Check if a device is activated.
1016
1017   If so, return informations about the real device.
1018
1019   Args:
1020     disk: the objects.Disk instance
1021     allow_partial: don't abort the find if a child of the
1022                    device can't be found; this is intended to be
1023                    used when repairing mirrors
1024
1025   Returns:
1026     None if the device can't be found
1027     otherwise the device instance
1028
1029   """
1030   children = []
1031   if disk.children:
1032     for chdisk in disk.children:
1033       children.append(_RecursiveFindBD(chdisk))
1034
1035   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1036
1037
1038 def FindBlockDevice(disk):
1039   """Check if a device is activated.
1040
1041   If so, return informations about the real device.
1042
1043   Args:
1044     disk: the objects.Disk instance
1045   Returns:
1046     None if the device can't be found
1047     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1048
1049   """
1050   rbd = _RecursiveFindBD(disk)
1051   if rbd is None:
1052     return rbd
1053   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1054
1055
1056 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1057   """Write a file to the filesystem.
1058
1059   This allows the master to overwrite(!) a file. It will only perform
1060   the operation if the file belongs to a list of configuration files.
1061
1062   """
1063   if not os.path.isabs(file_name):
1064     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1065                   file_name)
1066     return False
1067
1068   allowed_files = [
1069     constants.CLUSTER_CONF_FILE,
1070     constants.ETC_HOSTS,
1071     constants.SSH_KNOWN_HOSTS_FILE,
1072     constants.VNC_PASSWORD_FILE,
1073     ]
1074   allowed_files.extend(ssconf.SimpleStore().GetFileList())
1075
1076   if file_name not in allowed_files:
1077     logging.error("Filename passed to UploadFile not in allowed"
1078                  " upload targets: '%s'", file_name)
1079     return False
1080
1081   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1082                   atime=atime, mtime=mtime)
1083   return True
1084
1085
1086 def _ErrnoOrStr(err):
1087   """Format an EnvironmentError exception.
1088
1089   If the `err` argument has an errno attribute, it will be looked up
1090   and converted into a textual EXXXX description. Otherwise the string
1091   representation of the error will be returned.
1092
1093   """
1094   if hasattr(err, 'errno'):
1095     detail = errno.errorcode[err.errno]
1096   else:
1097     detail = str(err)
1098   return detail
1099
1100
1101 def _OSOndiskVersion(name, os_dir):
1102   """Compute and return the API version of a given OS.
1103
1104   This function will try to read the API version of the os given by
1105   the 'name' parameter and residing in the 'os_dir' directory.
1106
1107   Return value will be either an integer denoting the version or None in the
1108   case when this is not a valid OS name.
1109
1110   """
1111   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1112
1113   try:
1114     st = os.stat(api_file)
1115   except EnvironmentError, err:
1116     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1117                            " found (%s)" % _ErrnoOrStr(err))
1118
1119   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1120     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1121                            " a regular file")
1122
1123   try:
1124     f = open(api_file)
1125     try:
1126       api_version = f.read(256)
1127     finally:
1128       f.close()
1129   except EnvironmentError, err:
1130     raise errors.InvalidOS(name, os_dir, "error while reading the"
1131                            " API version (%s)" % _ErrnoOrStr(err))
1132
1133   api_version = api_version.strip()
1134   try:
1135     api_version = int(api_version)
1136   except (TypeError, ValueError), err:
1137     raise errors.InvalidOS(name, os_dir,
1138                            "API version is not integer (%s)" % str(err))
1139
1140   return api_version
1141
1142
1143 def DiagnoseOS(top_dirs=None):
1144   """Compute the validity for all OSes.
1145
1146   Returns an OS object for each name in all the given top directories
1147   (if not given defaults to constants.OS_SEARCH_PATH)
1148
1149   Returns:
1150     list of OS objects
1151
1152   """
1153   if top_dirs is None:
1154     top_dirs = constants.OS_SEARCH_PATH
1155
1156   result = []
1157   for dir_name in top_dirs:
1158     if os.path.isdir(dir_name):
1159       try:
1160         f_names = utils.ListVisibleFiles(dir_name)
1161       except EnvironmentError, err:
1162         logging.exception("Can't list the OS directory %s", dir_name)
1163         break
1164       for name in f_names:
1165         try:
1166           os_inst = OSFromDisk(name, base_dir=dir_name)
1167           result.append(os_inst)
1168         except errors.InvalidOS, err:
1169           result.append(objects.OS.FromInvalidOS(err))
1170
1171   return result
1172
1173
1174 def OSFromDisk(name, base_dir=None):
1175   """Create an OS instance from disk.
1176
1177   This function will return an OS instance if the given name is a
1178   valid OS name. Otherwise, it will raise an appropriate
1179   `errors.InvalidOS` exception, detailing why this is not a valid
1180   OS.
1181
1182   Args:
1183     os_dir: Directory containing the OS scripts. Defaults to a search
1184             in all the OS_SEARCH_PATH directories.
1185
1186   """
1187
1188   if base_dir is None:
1189     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1190     if os_dir is None:
1191       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1192   else:
1193     os_dir = os.path.sep.join([base_dir, name])
1194
1195   api_version = _OSOndiskVersion(name, os_dir)
1196
1197   if api_version != constants.OS_API_VERSION:
1198     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1199                            " (found %s want %s)"
1200                            % (api_version, constants.OS_API_VERSION))
1201
1202   # OS Scripts dictionary, we will populate it with the actual script names
1203   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1204
1205   for script in os_scripts:
1206     os_scripts[script] = os.path.sep.join([os_dir, script])
1207
1208     try:
1209       st = os.stat(os_scripts[script])
1210     except EnvironmentError, err:
1211       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1212                              (script, _ErrnoOrStr(err)))
1213
1214     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1215       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1216                              script)
1217
1218     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1219       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1220                              script)
1221
1222
1223   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1224                     create_script=os_scripts['create'],
1225                     export_script=os_scripts['export'],
1226                     import_script=os_scripts['import'],
1227                     rename_script=os_scripts['rename'],
1228                     api_version=api_version)
1229
1230
1231 def GrowBlockDevice(disk, amount):
1232   """Grow a stack of block devices.
1233
1234   This function is called recursively, with the childrens being the
1235   first one resize.
1236
1237   Args:
1238     disk: the disk to be grown
1239
1240   Returns: a tuple of (status, result), with:
1241     status: the result (true/false) of the operation
1242     result: the error message if the operation failed, otherwise not used
1243
1244   """
1245   r_dev = _RecursiveFindBD(disk)
1246   if r_dev is None:
1247     return False, "Cannot find block device %s" % (disk,)
1248
1249   try:
1250     r_dev.Grow(amount)
1251   except errors.BlockDeviceError, err:
1252     return False, str(err)
1253
1254   return True, None
1255
1256
1257 def SnapshotBlockDevice(disk):
1258   """Create a snapshot copy of a block device.
1259
1260   This function is called recursively, and the snapshot is actually created
1261   just for the leaf lvm backend device.
1262
1263   Args:
1264     disk: the disk to be snapshotted
1265
1266   Returns:
1267     a config entry for the actual lvm device snapshotted.
1268
1269   """
1270   if disk.children:
1271     if len(disk.children) == 1:
1272       # only one child, let's recurse on it
1273       return SnapshotBlockDevice(disk.children[0])
1274     else:
1275       # more than one child, choose one that matches
1276       for child in disk.children:
1277         if child.size == disk.size:
1278           # return implies breaking the loop
1279           return SnapshotBlockDevice(child)
1280   elif disk.dev_type == constants.LD_LV:
1281     r_dev = _RecursiveFindBD(disk)
1282     if r_dev is not None:
1283       # let's stay on the safe side and ask for the full size, for now
1284       return r_dev.Snapshot(disk.size)
1285     else:
1286       return None
1287   else:
1288     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1289                                  " '%s' of type '%s'" %
1290                                  (disk.unique_id, disk.dev_type))
1291
1292
1293 def ExportSnapshot(disk, dest_node, instance):
1294   """Export a block device snapshot to a remote node.
1295
1296   Args:
1297     disk: the snapshot block device
1298     dest_node: the node to send the image to
1299     instance: instance being exported
1300
1301   Returns:
1302     True if successful, False otherwise.
1303
1304   """
1305   inst_os = OSFromDisk(instance.os)
1306   export_script = inst_os.export_script
1307
1308   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1309                                      instance.name, int(time.time()))
1310   if not os.path.exists(constants.LOG_OS_DIR):
1311     os.mkdir(constants.LOG_OS_DIR, 0750)
1312
1313   real_os_dev = _RecursiveFindBD(disk)
1314   if real_os_dev is None:
1315     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1316                                   str(disk))
1317   real_os_dev.Open()
1318
1319   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1320   destfile = disk.physical_id[1]
1321
1322   # the target command is built out of three individual commands,
1323   # which are joined by pipes; we check each individual command for
1324   # valid parameters
1325
1326   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1327                                export_script, instance.name,
1328                                real_os_dev.dev_path, logfile)
1329
1330   comprcmd = "gzip"
1331
1332   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1333                                 destdir, destdir, destfile)
1334   remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1335                                        destcmd)
1336
1337   # all commands have been checked, so we're safe to combine them
1338   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1339
1340   result = utils.RunCmd(command)
1341
1342   if result.failed:
1343     logging.error("os snapshot export command '%s' returned error: %s"
1344                   " output: %s", command, result.fail_reason, result.output)
1345     return False
1346
1347   return True
1348
1349
1350 def FinalizeExport(instance, snap_disks):
1351   """Write out the export configuration information.
1352
1353   Args:
1354     instance: instance configuration
1355     snap_disks: snapshot block devices
1356
1357   Returns:
1358     False in case of error, True otherwise.
1359
1360   """
1361   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1362   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1363
1364   config = objects.SerializableConfigParser()
1365
1366   config.add_section(constants.INISECT_EXP)
1367   config.set(constants.INISECT_EXP, 'version', '0')
1368   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1369   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1370   config.set(constants.INISECT_EXP, 'os', instance.os)
1371   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1372
1373   config.add_section(constants.INISECT_INS)
1374   config.set(constants.INISECT_INS, 'name', instance.name)
1375   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1376   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1377   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1378
1379   nic_count = 0
1380   for nic_count, nic in enumerate(instance.nics):
1381     config.set(constants.INISECT_INS, 'nic%d_mac' %
1382                nic_count, '%s' % nic.mac)
1383     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1384     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1385                '%s' % nic.bridge)
1386   # TODO: redundant: on load can read nics until it doesn't exist
1387   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1388
1389   disk_count = 0
1390   for disk_count, disk in enumerate(snap_disks):
1391     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1392                ('%s' % disk.iv_name))
1393     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1394                ('%s' % disk.physical_id[1]))
1395     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1396                ('%d' % disk.size))
1397   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1398
1399   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1400   cfo = open(cff, 'w')
1401   try:
1402     config.write(cfo)
1403   finally:
1404     cfo.close()
1405
1406   shutil.rmtree(finaldestdir, True)
1407   shutil.move(destdir, finaldestdir)
1408
1409   return True
1410
1411
1412 def ExportInfo(dest):
1413   """Get export configuration information.
1414
1415   Args:
1416     dest: directory containing the export
1417
1418   Returns:
1419     A serializable config file containing the export info.
1420
1421   """
1422   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1423
1424   config = objects.SerializableConfigParser()
1425   config.read(cff)
1426
1427   if (not config.has_section(constants.INISECT_EXP) or
1428       not config.has_section(constants.INISECT_INS)):
1429     return None
1430
1431   return config
1432
1433
1434 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1435   """Import an os image into an instance.
1436
1437   Args:
1438     instance: the instance object
1439     os_disk: the instance-visible name of the os device
1440     swap_disk: the instance-visible name of the swap device
1441     src_node: node holding the source image
1442     src_image: path to the source image on src_node
1443
1444   Returns:
1445     False in case of error, True otherwise.
1446
1447   """
1448   inst_os = OSFromDisk(instance.os)
1449   import_script = inst_os.import_script
1450
1451   os_device = instance.FindDisk(os_disk)
1452   if os_device is None:
1453     logging.error("Can't find this device-visible name '%s'", os_disk)
1454     return False
1455
1456   swap_device = instance.FindDisk(swap_disk)
1457   if swap_device is None:
1458     logging.error("Can't find this device-visible name '%s'", swap_disk)
1459     return False
1460
1461   real_os_dev = _RecursiveFindBD(os_device)
1462   if real_os_dev is None:
1463     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1464                                   str(os_device))
1465   real_os_dev.Open()
1466
1467   real_swap_dev = _RecursiveFindBD(swap_device)
1468   if real_swap_dev is None:
1469     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1470                                   str(swap_device))
1471   real_swap_dev.Open()
1472
1473   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1474                                         instance.name, int(time.time()))
1475   if not os.path.exists(constants.LOG_OS_DIR):
1476     os.mkdir(constants.LOG_OS_DIR, 0750)
1477
1478   destcmd = utils.BuildShellCmd('cat %s', src_image)
1479   remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1480                                        destcmd)
1481
1482   comprcmd = "gunzip"
1483   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1484                                inst_os.path, import_script, instance.name,
1485                                real_os_dev.dev_path, real_swap_dev.dev_path,
1486                                logfile)
1487
1488   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1489
1490   result = utils.RunCmd(command)
1491
1492   if result.failed:
1493     logging.error("os import command '%s' returned error: %s"
1494                   " output: %s", command, result.fail_reason, result.output)
1495     return False
1496
1497   return True
1498
1499
1500 def ListExports():
1501   """Return a list of exports currently available on this machine.
1502
1503   """
1504   if os.path.isdir(constants.EXPORT_DIR):
1505     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1506   else:
1507     return []
1508
1509
1510 def RemoveExport(export):
1511   """Remove an existing export from the node.
1512
1513   Args:
1514     export: the name of the export to remove
1515
1516   Returns:
1517     False in case of error, True otherwise.
1518
1519   """
1520   target = os.path.join(constants.EXPORT_DIR, export)
1521
1522   shutil.rmtree(target)
1523   # TODO: catch some of the relevant exceptions and provide a pretty
1524   # error message if rmtree fails.
1525
1526   return True
1527
1528
1529 def RenameBlockDevices(devlist):
1530   """Rename a list of block devices.
1531
1532   The devlist argument is a list of tuples (disk, new_logical,
1533   new_physical). The return value will be a combined boolean result
1534   (True only if all renames succeeded).
1535
1536   """
1537   result = True
1538   for disk, unique_id in devlist:
1539     dev = _RecursiveFindBD(disk)
1540     if dev is None:
1541       result = False
1542       continue
1543     try:
1544       old_rpath = dev.dev_path
1545       dev.Rename(unique_id)
1546       new_rpath = dev.dev_path
1547       if old_rpath != new_rpath:
1548         DevCacheManager.RemoveCache(old_rpath)
1549         # FIXME: we should add the new cache information here, like:
1550         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1551         # but we don't have the owner here - maybe parse from existing
1552         # cache? for now, we only lose lvm data when we rename, which
1553         # is less critical than DRBD or MD
1554     except errors.BlockDeviceError, err:
1555       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1556       result = False
1557   return result
1558
1559
1560 def _TransformFileStorageDir(file_storage_dir):
1561   """Checks whether given file_storage_dir is valid.
1562
1563   Checks wheter the given file_storage_dir is within the cluster-wide
1564   default file_storage_dir stored in SimpleStore. Only paths under that
1565   directory are allowed.
1566
1567   Args:
1568     file_storage_dir: string with path
1569
1570   Returns:
1571     normalized file_storage_dir (string) if valid, None otherwise
1572
1573   """
1574   file_storage_dir = os.path.normpath(file_storage_dir)
1575   base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1576   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1577       base_file_storage_dir):
1578     logging.error("file storage directory '%s' is not under base file"
1579                   " storage directory '%s'",
1580                   file_storage_dir, base_file_storage_dir)
1581     return None
1582   return file_storage_dir
1583
1584
1585 def CreateFileStorageDir(file_storage_dir):
1586   """Create file storage directory.
1587
1588   Args:
1589     file_storage_dir: string containing the path
1590
1591   Returns:
1592     tuple with first element a boolean indicating wheter dir
1593     creation was successful or not
1594
1595   """
1596   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1597   result = True,
1598   if not file_storage_dir:
1599     result = False,
1600   else:
1601     if os.path.exists(file_storage_dir):
1602       if not os.path.isdir(file_storage_dir):
1603         logging.error("'%s' is not a directory", file_storage_dir)
1604         result = False,
1605     else:
1606       try:
1607         os.makedirs(file_storage_dir, 0750)
1608       except OSError, err:
1609         logging.error("Cannot create file storage directory '%s': %s",
1610                       file_storage_dir, err)
1611         result = False,
1612   return result
1613
1614
1615 def RemoveFileStorageDir(file_storage_dir):
1616   """Remove file storage directory.
1617
1618   Remove it only if it's empty. If not log an error and return.
1619
1620   Args:
1621     file_storage_dir: string containing the path
1622
1623   Returns:
1624     tuple with first element a boolean indicating wheter dir
1625     removal was successful or not
1626
1627   """
1628   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1629   result = True,
1630   if not file_storage_dir:
1631     result = False,
1632   else:
1633     if os.path.exists(file_storage_dir):
1634       if not os.path.isdir(file_storage_dir):
1635         logging.error("'%s' is not a directory", file_storage_dir)
1636         result = False,
1637       # deletes dir only if empty, otherwise we want to return False
1638       try:
1639         os.rmdir(file_storage_dir)
1640       except OSError, err:
1641         logging.exception("Cannot remove file storage directory '%s'",
1642                           file_storage_dir)
1643         result = False,
1644   return result
1645
1646
1647 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1648   """Rename the file storage directory.
1649
1650   Args:
1651     old_file_storage_dir: string containing the old path
1652     new_file_storage_dir: string containing the new path
1653
1654   Returns:
1655     tuple with first element a boolean indicating wheter dir
1656     rename was successful or not
1657
1658   """
1659   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1660   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1661   result = True,
1662   if not old_file_storage_dir or not new_file_storage_dir:
1663     result = False,
1664   else:
1665     if not os.path.exists(new_file_storage_dir):
1666       if os.path.isdir(old_file_storage_dir):
1667         try:
1668           os.rename(old_file_storage_dir, new_file_storage_dir)
1669         except OSError, err:
1670           logging.exception("Cannot rename '%s' to '%s'",
1671                             old_file_storage_dir, new_file_storage_dir)
1672           result =  False,
1673       else:
1674         logging.error("'%s' is not a directory", old_file_storage_dir)
1675         result = False,
1676     else:
1677       if os.path.exists(old_file_storage_dir):
1678         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1679                       old_file_storage_dir, new_file_storage_dir)
1680         result = False,
1681   return result
1682
1683
1684 def _IsJobQueueFile(file_name):
1685   """Checks whether the given filename is in the queue directory.
1686
1687   """
1688   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1689   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1690
1691   if not result:
1692     logging.error("'%s' is not a file in the queue directory",
1693                   file_name)
1694
1695   return result
1696
1697
1698 def JobQueueUpdate(file_name, content):
1699   """Updates a file in the queue directory.
1700
1701   """
1702   if not _IsJobQueueFile(file_name):
1703     return False
1704
1705   # Write and replace the file atomically
1706   utils.WriteFile(file_name, data=content)
1707
1708   return True
1709
1710
1711 def JobQueuePurge():
1712   """Removes job queue files and archived jobs
1713
1714   """
1715   # The lock must not be removed, otherwise another process could create
1716   # it again.
1717   return _JobQueuePurge(keep_lock=True)
1718
1719
1720 def JobQueueRename(old, new):
1721   """Renames a job queue file.
1722
1723   """
1724   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1725     return False
1726
1727   os.rename(old, new)
1728
1729   return True
1730
1731
1732 def CloseBlockDevices(disks):
1733   """Closes the given block devices.
1734
1735   This means they will be switched to secondary mode (in case of DRBD).
1736
1737   """
1738   bdevs = []
1739   for cf in disks:
1740     rd = _RecursiveFindBD(cf)
1741     if rd is None:
1742       return (False, "Can't find device %s" % cf)
1743     bdevs.append(rd)
1744
1745   msg = []
1746   for rd in bdevs:
1747     try:
1748       rd.Close()
1749     except errors.BlockDeviceError, err:
1750       msg.append(str(err))
1751   if msg:
1752     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1753   else:
1754     return (True, "All devices secondary")
1755
1756
1757 class HooksRunner(object):
1758   """Hook runner.
1759
1760   This class is instantiated on the node side (ganeti-noded) and not on
1761   the master side.
1762
1763   """
1764   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1765
1766   def __init__(self, hooks_base_dir=None):
1767     """Constructor for hooks runner.
1768
1769     Args:
1770       - hooks_base_dir: if not None, this overrides the
1771         constants.HOOKS_BASE_DIR (useful for unittests)
1772
1773     """
1774     if hooks_base_dir is None:
1775       hooks_base_dir = constants.HOOKS_BASE_DIR
1776     self._BASE_DIR = hooks_base_dir
1777
1778   @staticmethod
1779   def ExecHook(script, env):
1780     """Exec one hook script.
1781
1782     Args:
1783      - script: the full path to the script
1784      - env: the environment with which to exec the script
1785
1786     """
1787     # exec the process using subprocess and log the output
1788     fdstdin = None
1789     try:
1790       fdstdin = open("/dev/null", "r")
1791       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1792                                stderr=subprocess.STDOUT, close_fds=True,
1793                                shell=False, cwd="/", env=env)
1794       output = ""
1795       try:
1796         output = child.stdout.read(4096)
1797         child.stdout.close()
1798       except EnvironmentError, err:
1799         output += "Hook script error: %s" % str(err)
1800
1801       while True:
1802         try:
1803           result = child.wait()
1804           break
1805         except EnvironmentError, err:
1806           if err.errno == errno.EINTR:
1807             continue
1808           raise
1809     finally:
1810       # try not to leak fds
1811       for fd in (fdstdin, ):
1812         if fd is not None:
1813           try:
1814             fd.close()
1815           except EnvironmentError, err:
1816             # just log the error
1817             #logging.exception("Error while closing fd %s", fd)
1818             pass
1819
1820     return result == 0, output
1821
1822   def RunHooks(self, hpath, phase, env):
1823     """Run the scripts in the hooks directory.
1824
1825     This method will not be usually overriden by child opcodes.
1826
1827     """
1828     if phase == constants.HOOKS_PHASE_PRE:
1829       suffix = "pre"
1830     elif phase == constants.HOOKS_PHASE_POST:
1831       suffix = "post"
1832     else:
1833       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1834     rr = []
1835
1836     subdir = "%s-%s.d" % (hpath, suffix)
1837     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1838     try:
1839       dir_contents = utils.ListVisibleFiles(dir_name)
1840     except OSError, err:
1841       # must log
1842       return rr
1843
1844     # we use the standard python sort order,
1845     # so 00name is the recommended naming scheme
1846     dir_contents.sort()
1847     for relname in dir_contents:
1848       fname = os.path.join(dir_name, relname)
1849       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1850           self.RE_MASK.match(relname) is not None):
1851         rrval = constants.HKR_SKIP
1852         output = ""
1853       else:
1854         result, output = self.ExecHook(fname, env)
1855         if not result:
1856           rrval = constants.HKR_FAIL
1857         else:
1858           rrval = constants.HKR_SUCCESS
1859       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1860
1861     return rr
1862
1863
1864 class IAllocatorRunner(object):
1865   """IAllocator runner.
1866
1867   This class is instantiated on the node side (ganeti-noded) and not on
1868   the master side.
1869
1870   """
1871   def Run(self, name, idata):
1872     """Run an iallocator script.
1873
1874     Return value: tuple of:
1875        - run status (one of the IARUN_ constants)
1876        - stdout
1877        - stderr
1878        - fail reason (as from utils.RunResult)
1879
1880     """
1881     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1882                                   os.path.isfile)
1883     if alloc_script is None:
1884       return (constants.IARUN_NOTFOUND, None, None, None)
1885
1886     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1887     try:
1888       os.write(fd, idata)
1889       os.close(fd)
1890       result = utils.RunCmd([alloc_script, fin_name])
1891       if result.failed:
1892         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1893                 result.fail_reason)
1894     finally:
1895       os.unlink(fin_name)
1896
1897     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1898
1899
1900 class DevCacheManager(object):
1901   """Simple class for managing a cache of block device information.
1902
1903   """
1904   _DEV_PREFIX = "/dev/"
1905   _ROOT_DIR = constants.BDEV_CACHE_DIR
1906
1907   @classmethod
1908   def _ConvertPath(cls, dev_path):
1909     """Converts a /dev/name path to the cache file name.
1910
1911     This replaces slashes with underscores and strips the /dev
1912     prefix. It then returns the full path to the cache file
1913
1914     """
1915     if dev_path.startswith(cls._DEV_PREFIX):
1916       dev_path = dev_path[len(cls._DEV_PREFIX):]
1917     dev_path = dev_path.replace("/", "_")
1918     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1919     return fpath
1920
1921   @classmethod
1922   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1923     """Updates the cache information for a given device.
1924
1925     """
1926     if dev_path is None:
1927       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1928       return
1929     fpath = cls._ConvertPath(dev_path)
1930     if on_primary:
1931       state = "primary"
1932     else:
1933       state = "secondary"
1934     if iv_name is None:
1935       iv_name = "not_visible"
1936     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1937     try:
1938       utils.WriteFile(fpath, data=fdata)
1939     except EnvironmentError, err:
1940       logging.exception("Can't update bdev cache for %s", dev_path)
1941
1942   @classmethod
1943   def RemoveCache(cls, dev_path):
1944     """Remove data for a dev_path.
1945
1946     """
1947     if dev_path is None:
1948       logging.error("DevCacheManager.RemoveCache got a None dev_path")
1949       return
1950     fpath = cls._ConvertPath(dev_path)
1951     try:
1952       utils.RemoveFile(fpath)
1953     except EnvironmentError, err:
1954       logging.exception("Can't update bdev cache for %s", dev_path)