rpc.call_instance_migrate: pass the whole instance
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner(cluster_name):
52   return ssh.SshRunner(cluster_name)
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
121                      source=constants.LOCALHOST_IP_ADDRESS):
122       # we already have the ip:
123       logging.debug("Already started")
124     else:
125       logging.error("Someone else has the master ip, not activating")
126       ok = False
127   else:
128     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
129                            "dev", master_netdev, "label",
130                            "%s:0" % master_netdev])
131     if result.failed:
132       logging.error("Can't activate master IP: %s", result.output)
133       ok = False
134
135     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
136                            "-s", master_ip, master_ip])
137     # we'll ignore the exit code of arping
138
139   # and now start the master and rapi daemons
140   if start_daemons:
141     for daemon in 'ganeti-masterd', 'ganeti-rapi':
142       result = utils.RunCmd([daemon])
143       if result.failed:
144         logging.error("Can't start daemon %s: %s", daemon, result.output)
145         ok = False
146   return ok
147
148
149 def StopMaster(stop_daemons):
150   """Deactivate this node as master.
151
152   The function will always try to deactivate the IP address of the
153   master. Then, if the stop_daemons parameter is True, it will also
154   stop the master daemons (ganet-masterd and ganeti-rapi).
155
156   """
157   master_netdev, master_ip, _ = GetMasterInfo()
158   if not master_netdev:
159     return False
160
161   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
162                          "dev", master_netdev])
163   if result.failed:
164     logging.error("Can't remove the master IP, error: %s", result.output)
165     # but otherwise ignore the failure
166
167   if stop_daemons:
168     # stop/kill the rapi and the master daemon
169     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
170       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
171
172   return True
173
174
175 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
176   """Joins this node to the cluster.
177
178   This does the following:
179       - updates the hostkeys of the machine (rsa and dsa)
180       - adds the ssh private key to the user
181       - adds the ssh public key to the users' authorized_keys file
182
183   """
184   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
185                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
186                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
187                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
188   for name, content, mode in sshd_keys:
189     utils.WriteFile(name, data=content, mode=mode)
190
191   try:
192     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193                                                     mkdir=True)
194   except errors.OpExecError, err:
195     logging.exception("Error while processing user ssh files")
196     return False
197
198   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
199     utils.WriteFile(name, data=content, mode=0600)
200
201   utils.AddAuthorizedKey(auth_keys, sshpub)
202
203   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
204
205   return True
206
207
208 def LeaveCluster():
209   """Cleans up the current node and prepares it to be removed from the cluster.
210
211   """
212   _CleanDirectory(constants.DATA_DIR)
213   JobQueuePurge()
214
215   try:
216     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
217   except errors.OpExecError:
218     logging.exception("Error while processing ssh files")
219     return
220
221   f = open(pub_key, 'r')
222   try:
223     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
224   finally:
225     f.close()
226
227   utils.RemoveFile(priv_key)
228   utils.RemoveFile(pub_key)
229
230   # Return a reassuring string to the caller, and quit
231   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
232
233
234 def GetNodeInfo(vgname):
235   """Gives back a hash with different informations about the node.
236
237   Returns:
238     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
239       'memory_free' : xxx, 'memory_total' : xxx }
240     where
241     vg_size is the size of the configured volume group in MiB
242     vg_free is the free size of the volume group in MiB
243     memory_dom0 is the memory allocated for domain0 in MiB
244     memory_free is the currently available (free) ram in MiB
245     memory_total is the total number of ram in MiB
246
247   """
248   outputarray = {}
249   vginfo = _GetVGInfo(vgname)
250   outputarray['vg_size'] = vginfo['vg_size']
251   outputarray['vg_free'] = vginfo['vg_free']
252
253   hyper = hypervisor.GetHypervisor(_GetConfig())
254   hyp_info = hyper.GetNodeInfo()
255   if hyp_info is not None:
256     outputarray.update(hyp_info)
257
258   f = open("/proc/sys/kernel/random/boot_id", 'r')
259   try:
260     outputarray["bootid"] = f.read(128).rstrip("\n")
261   finally:
262     f.close()
263
264   return outputarray
265
266
267 def VerifyNode(what, cluster_name):
268   """Verify the status of the local node.
269
270   Args:
271     what - a dictionary of things to check:
272       'filelist' : list of files for which to compute checksums
273       'nodelist' : list of nodes we should check communication with
274       'hypervisor': run the hypervisor-specific verify
275
276   Requested files on local node are checksummed and the result returned.
277
278   The nodelist is traversed, with the following checks being made
279   for each node:
280   - known_hosts key correct
281   - correct resolving of node name (target node returns its own hostname
282     by ssh-execution of 'hostname', result compared against name in list.
283
284   """
285   result = {}
286
287   if 'hypervisor' in what:
288     result['hypervisor'] = hypervisor.GetHypervisor(_GetConfig()).Verify()
289
290   if 'filelist' in what:
291     result['filelist'] = utils.FingerprintFiles(what['filelist'])
292
293   if 'nodelist' in what:
294     result['nodelist'] = {}
295     random.shuffle(what['nodelist'])
296     for node in what['nodelist']:
297       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
298       if not success:
299         result['nodelist'][node] = message
300   if 'node-net-test' in what:
301     result['node-net-test'] = {}
302     my_name = utils.HostInfo().name
303     my_pip = my_sip = None
304     for name, pip, sip in what['node-net-test']:
305       if name == my_name:
306         my_pip = pip
307         my_sip = sip
308         break
309     if not my_pip:
310       result['node-net-test'][my_name] = ("Can't find my own"
311                                           " primary/secondary IP"
312                                           " in the node list")
313     else:
314       port = utils.GetNodeDaemonPort()
315       for name, pip, sip in what['node-net-test']:
316         fail = []
317         if not utils.TcpPing(pip, port, source=my_pip):
318           fail.append("primary")
319         if sip != pip:
320           if not utils.TcpPing(sip, port, source=my_sip):
321             fail.append("secondary")
322         if fail:
323           result['node-net-test'][name] = ("failure using the %s"
324                                            " interface(s)" %
325                                            " and ".join(fail))
326
327   return result
328
329
330 def GetVolumeList(vg_name):
331   """Compute list of logical volumes and their size.
332
333   Returns:
334     dictionary of all partions (key) with their size (in MiB), inactive
335     and online status:
336     {'test1': ('20.06', True, True)}
337
338   """
339   lvs = {}
340   sep = '|'
341   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
342                          "--separator=%s" % sep,
343                          "-olv_name,lv_size,lv_attr", vg_name])
344   if result.failed:
345     logging.error("Failed to list logical volumes, lvs output: %s",
346                   result.output)
347     return result.output
348
349   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
350   for line in result.stdout.splitlines():
351     line = line.strip()
352     match = valid_line_re.match(line)
353     if not match:
354       logging.error("Invalid line returned from lvs output: '%s'", line)
355       continue
356     name, size, attr = match.groups()
357     inactive = attr[4] == '-'
358     online = attr[5] == 'o'
359     lvs[name] = (size, inactive, online)
360
361   return lvs
362
363
364 def ListVolumeGroups():
365   """List the volume groups and their size.
366
367   Returns:
368     Dictionary with keys volume name and values the size of the volume
369
370   """
371   return utils.ListVolumeGroups()
372
373
374 def NodeVolumes():
375   """List all volumes on this node.
376
377   """
378   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
379                          "--separator=|",
380                          "--options=lv_name,lv_size,devices,vg_name"])
381   if result.failed:
382     logging.error("Failed to list logical volumes, lvs output: %s",
383                   result.output)
384     return {}
385
386   def parse_dev(dev):
387     if '(' in dev:
388       return dev.split('(')[0]
389     else:
390       return dev
391
392   def map_line(line):
393     return {
394       'name': line[0].strip(),
395       'size': line[1].strip(),
396       'dev': parse_dev(line[2].strip()),
397       'vg': line[3].strip(),
398     }
399
400   return [map_line(line.split('|')) for line in result.stdout.splitlines()
401           if line.count('|') >= 3]
402
403
404 def BridgesExist(bridges_list):
405   """Check if a list of bridges exist on the current node.
406
407   Returns:
408     True if all of them exist, false otherwise
409
410   """
411   for bridge in bridges_list:
412     if not utils.BridgeExists(bridge):
413       return False
414
415   return True
416
417
418 def GetInstanceList():
419   """Provides a list of instances.
420
421   Returns:
422     A list of all running instances on the current node
423     - instance1.example.com
424     - instance2.example.com
425
426   """
427   try:
428     names = hypervisor.GetHypervisor(_GetConfig()).ListInstances()
429   except errors.HypervisorError, err:
430     logging.exception("Error enumerating instances")
431     raise
432
433   return names
434
435
436 def GetInstanceInfo(instance):
437   """Gives back the informations about an instance as a dictionary.
438
439   Args:
440     instance: name of the instance (ex. instance1.example.com)
441
442   Returns:
443     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
444     where
445     memory: memory size of instance (int)
446     state: xen state of instance (string)
447     time: cpu time of instance (float)
448
449   """
450   output = {}
451
452   iinfo = hypervisor.GetHypervisor(_GetConfig()).GetInstanceInfo(instance)
453   if iinfo is not None:
454     output['memory'] = iinfo[2]
455     output['state'] = iinfo[4]
456     output['time'] = iinfo[5]
457
458   return output
459
460
461 def GetAllInstancesInfo():
462   """Gather data about all instances.
463
464   This is the equivalent of `GetInstanceInfo()`, except that it
465   computes data for all instances at once, thus being faster if one
466   needs data about more than one instance.
467
468   Returns: a dictionary of dictionaries, keys being the instance name,
469     and with values:
470     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
471     where
472     memory: memory size of instance (int)
473     state: xen state of instance (string)
474     time: cpu time of instance (float)
475     vcpus: the number of cpus
476
477   """
478   output = {}
479
480   iinfo = hypervisor.GetHypervisor(_GetConfig()).GetAllInstancesInfo()
481   if iinfo:
482     for name, inst_id, memory, vcpus, state, times in iinfo:
483       output[name] = {
484         'memory': memory,
485         'vcpus': vcpus,
486         'state': state,
487         'time': times,
488         }
489
490   return output
491
492
493 def AddOSToInstance(instance, os_disk, swap_disk):
494   """Add an OS to an instance.
495
496   Args:
497     instance: the instance object
498     os_disk: the instance-visible name of the os device
499     swap_disk: the instance-visible name of the swap device
500
501   """
502   cfg = _GetConfig()
503   inst_os = OSFromDisk(instance.os)
504
505   create_script = inst_os.create_script
506
507   os_device = instance.FindDisk(os_disk)
508   if os_device is None:
509     logging.error("Can't find this device-visible name '%s'", os_disk)
510     return False
511
512   swap_device = instance.FindDisk(swap_disk)
513   if swap_device is None:
514     logging.error("Can't find this device-visible name '%s'", swap_disk)
515     return False
516
517   real_os_dev = _RecursiveFindBD(os_device)
518   if real_os_dev is None:
519     raise errors.BlockDeviceError("Block device '%s' is not set up" %
520                                   str(os_device))
521   real_os_dev.Open()
522
523   real_swap_dev = _RecursiveFindBD(swap_device)
524   if real_swap_dev is None:
525     raise errors.BlockDeviceError("Block device '%s' is not set up" %
526                                   str(swap_device))
527   real_swap_dev.Open()
528
529   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
530                                      instance.name, int(time.time()))
531   if not os.path.exists(constants.LOG_OS_DIR):
532     os.mkdir(constants.LOG_OS_DIR, 0750)
533
534   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
535                                 inst_os.path, create_script, instance.name,
536                                 real_os_dev.dev_path, real_swap_dev.dev_path,
537                                 logfile)
538   env = {'HYPERVISOR': cfg.GetHypervisorType()}
539
540   result = utils.RunCmd(command, env=env)
541   if result.failed:
542     logging.error("os create command '%s' returned error: %s, logfile: %s,"
543                   " output: %s", command, result.fail_reason, logfile,
544                   result.output)
545     return False
546
547   return True
548
549
550 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
551   """Run the OS rename script for an instance.
552
553   Args:
554     instance: the instance object
555     old_name: the old name of the instance
556     os_disk: the instance-visible name of the os device
557     swap_disk: the instance-visible name of the swap device
558
559   """
560   inst_os = OSFromDisk(instance.os)
561
562   script = inst_os.rename_script
563
564   os_device = instance.FindDisk(os_disk)
565   if os_device is None:
566     logging.error("Can't find this device-visible name '%s'", os_disk)
567     return False
568
569   swap_device = instance.FindDisk(swap_disk)
570   if swap_device is None:
571     logging.error("Can't find this device-visible name '%s'", swap_disk)
572     return False
573
574   real_os_dev = _RecursiveFindBD(os_device)
575   if real_os_dev is None:
576     raise errors.BlockDeviceError("Block device '%s' is not set up" %
577                                   str(os_device))
578   real_os_dev.Open()
579
580   real_swap_dev = _RecursiveFindBD(swap_device)
581   if real_swap_dev is None:
582     raise errors.BlockDeviceError("Block device '%s' is not set up" %
583                                   str(swap_device))
584   real_swap_dev.Open()
585
586   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
587                                            old_name,
588                                            instance.name, int(time.time()))
589   if not os.path.exists(constants.LOG_OS_DIR):
590     os.mkdir(constants.LOG_OS_DIR, 0750)
591
592   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
593                                 inst_os.path, script, old_name, instance.name,
594                                 real_os_dev.dev_path, real_swap_dev.dev_path,
595                                 logfile)
596
597   result = utils.RunCmd(command)
598
599   if result.failed:
600     logging.error("os create command '%s' returned error: %s output: %s",
601                   command, result.fail_reason, result.output)
602     return False
603
604   return True
605
606
607 def _GetVGInfo(vg_name):
608   """Get informations about the volume group.
609
610   Args:
611     vg_name: the volume group
612
613   Returns:
614     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
615     where
616     vg_size is the total size of the volume group in MiB
617     vg_free is the free size of the volume group in MiB
618     pv_count are the number of physical disks in that vg
619
620   If an error occurs during gathering of data, we return the same dict
621   with keys all set to None.
622
623   """
624   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
625
626   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
627                          "--nosuffix", "--units=m", "--separator=:", vg_name])
628
629   if retval.failed:
630     logging.error("volume group %s not present", vg_name)
631     return retdic
632   valarr = retval.stdout.strip().rstrip(':').split(':')
633   if len(valarr) == 3:
634     try:
635       retdic = {
636         "vg_size": int(round(float(valarr[0]), 0)),
637         "vg_free": int(round(float(valarr[1]), 0)),
638         "pv_count": int(valarr[2]),
639         }
640     except ValueError, err:
641       logging.exception("Fail to parse vgs output")
642   else:
643     logging.error("vgs output has the wrong number of fields (expected"
644                   " three): %s", str(valarr))
645   return retdic
646
647
648 def _GatherBlockDevs(instance):
649   """Set up an instance's block device(s).
650
651   This is run on the primary node at instance startup. The block
652   devices must be already assembled.
653
654   """
655   block_devices = []
656   for disk in instance.disks:
657     device = _RecursiveFindBD(disk)
658     if device is None:
659       raise errors.BlockDeviceError("Block device '%s' is not set up." %
660                                     str(disk))
661     device.Open()
662     block_devices.append((disk, device))
663   return block_devices
664
665
666 def StartInstance(instance, extra_args):
667   """Start an instance.
668
669   Args:
670     instance - name of instance to start.
671
672   """
673   running_instances = GetInstanceList()
674
675   if instance.name in running_instances:
676     return True
677
678   block_devices = _GatherBlockDevs(instance)
679   hyper = hypervisor.GetHypervisor(_GetConfig())
680
681   try:
682     hyper.StartInstance(instance, block_devices, extra_args)
683   except errors.HypervisorError, err:
684     logging.exception("Failed to start instance")
685     return False
686
687   return True
688
689
690 def ShutdownInstance(instance):
691   """Shut an instance down.
692
693   Args:
694     instance - name of instance to shutdown.
695
696   """
697   running_instances = GetInstanceList()
698
699   if instance.name not in running_instances:
700     return True
701
702   hyper = hypervisor.GetHypervisor(_GetConfig())
703   try:
704     hyper.StopInstance(instance)
705   except errors.HypervisorError, err:
706     logging.error("Failed to stop instance")
707     return False
708
709   # test every 10secs for 2min
710   shutdown_ok = False
711
712   time.sleep(1)
713   for dummy in range(11):
714     if instance.name not in GetInstanceList():
715       break
716     time.sleep(10)
717   else:
718     # the shutdown did not succeed
719     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
720
721     try:
722       hyper.StopInstance(instance, force=True)
723     except errors.HypervisorError, err:
724       logging.exception("Failed to stop instance")
725       return False
726
727     time.sleep(1)
728     if instance.name in GetInstanceList():
729       logging.error("could not shutdown instance '%s' even by destroy",
730                     instance.name)
731       return False
732
733   return True
734
735
736 def RebootInstance(instance, reboot_type, extra_args):
737   """Reboot an instance.
738
739   Args:
740     instance    - name of instance to reboot
741     reboot_type - how to reboot [soft,hard,full]
742
743   """
744   running_instances = GetInstanceList()
745
746   if instance.name not in running_instances:
747     logging.error("Cannot reboot instance that is not running")
748     return False
749
750   hyper = hypervisor.GetHypervisor(_GetConfig())
751   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
752     try:
753       hyper.RebootInstance(instance)
754     except errors.HypervisorError, err:
755       logging.exception("Failed to soft reboot instance")
756       return False
757   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
758     try:
759       ShutdownInstance(instance)
760       StartInstance(instance, extra_args)
761     except errors.HypervisorError, err:
762       logging.exception("Failed to hard reboot instance")
763       return False
764   else:
765     raise errors.ParameterError("reboot_type invalid")
766
767
768   return True
769
770
771 def MigrateInstance(instance, target, live):
772   """Migrates an instance to another node.
773
774   @type instance: C{objects.Instance}
775   @param instance: the instance definition
776   @type target: string
777   @param target: the target node name
778   @type live: boolean
779   @param live: whether the migration should be done live or not (the
780       interpretation of this parameter is left to the hypervisor)
781   @rtype: tuple
782   @return: a tuple of (success, msg) where:
783       - succes is a boolean denoting the success/failure of the operation
784       - msg is a string with details in case of failure
785
786   """
787   hyper = hypervisor.GetHypervisor(_GetConfig())
788
789   try:
790     hyper.MigrateInstance(instance.name, target, live)
791   except errors.HypervisorError, err:
792     msg = "Failed to migrate instance: %s" % str(err)
793     logging.error(msg)
794     return (False, msg)
795   return (True, "Migration successfull")
796
797
798 def CreateBlockDevice(disk, size, owner, on_primary, info):
799   """Creates a block device for an instance.
800
801   Args:
802    disk: a ganeti.objects.Disk object
803    size: the size of the physical underlying device
804    owner: a string with the name of the instance
805    on_primary: a boolean indicating if it is the primary node or not
806    info: string that will be sent to the physical device creation
807
808   Returns:
809     the new unique_id of the device (this can sometime be
810     computed only after creation), or None. On secondary nodes,
811     it's not required to return anything.
812
813   """
814   clist = []
815   if disk.children:
816     for child in disk.children:
817       crdev = _RecursiveAssembleBD(child, owner, on_primary)
818       if on_primary or disk.AssembleOnSecondary():
819         # we need the children open in case the device itself has to
820         # be assembled
821         crdev.Open()
822       clist.append(crdev)
823   try:
824     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
825     if device is not None:
826       logging.info("removing existing device %s", disk)
827       device.Remove()
828   except errors.BlockDeviceError, err:
829     pass
830
831   device = bdev.Create(disk.dev_type, disk.physical_id,
832                        clist, size)
833   if device is None:
834     raise ValueError("Can't create child device for %s, %s" %
835                      (disk, size))
836   if on_primary or disk.AssembleOnSecondary():
837     if not device.Assemble():
838       errorstring = "Can't assemble device after creation"
839       logging.error(errorstring)
840       raise errors.BlockDeviceError("%s, very unusual event - check the node"
841                                     " daemon logs" % errorstring)
842     device.SetSyncSpeed(constants.SYNC_SPEED)
843     if on_primary or disk.OpenOnSecondary():
844       device.Open(force=True)
845     DevCacheManager.UpdateCache(device.dev_path, owner,
846                                 on_primary, disk.iv_name)
847
848   device.SetInfo(info)
849
850   physical_id = device.unique_id
851   return physical_id
852
853
854 def RemoveBlockDevice(disk):
855   """Remove a block device.
856
857   This is intended to be called recursively.
858
859   """
860   try:
861     # since we are removing the device, allow a partial match
862     # this allows removal of broken mirrors
863     rdev = _RecursiveFindBD(disk, allow_partial=True)
864   except errors.BlockDeviceError, err:
865     # probably can't attach
866     logging.info("Can't attach to device %s in remove", disk)
867     rdev = None
868   if rdev is not None:
869     r_path = rdev.dev_path
870     result = rdev.Remove()
871     if result:
872       DevCacheManager.RemoveCache(r_path)
873   else:
874     result = True
875   if disk.children:
876     for child in disk.children:
877       result = result and RemoveBlockDevice(child)
878   return result
879
880
881 def _RecursiveAssembleBD(disk, owner, as_primary):
882   """Activate a block device for an instance.
883
884   This is run on the primary and secondary nodes for an instance.
885
886   This function is called recursively.
887
888   Args:
889     disk: a objects.Disk object
890     as_primary: if we should make the block device read/write
891
892   Returns:
893     the assembled device or None (in case no device was assembled)
894
895   If the assembly is not successful, an exception is raised.
896
897   """
898   children = []
899   if disk.children:
900     mcn = disk.ChildrenNeeded()
901     if mcn == -1:
902       mcn = 0 # max number of Nones allowed
903     else:
904       mcn = len(disk.children) - mcn # max number of Nones
905     for chld_disk in disk.children:
906       try:
907         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
908       except errors.BlockDeviceError, err:
909         if children.count(None) >= mcn:
910           raise
911         cdev = None
912         logging.debug("Error in child activation: %s", str(err))
913       children.append(cdev)
914
915   if as_primary or disk.AssembleOnSecondary():
916     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
917     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
918     result = r_dev
919     if as_primary or disk.OpenOnSecondary():
920       r_dev.Open()
921     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
922                                 as_primary, disk.iv_name)
923
924   else:
925     result = True
926   return result
927
928
929 def AssembleBlockDevice(disk, owner, as_primary):
930   """Activate a block device for an instance.
931
932   This is a wrapper over _RecursiveAssembleBD.
933
934   Returns:
935     a /dev path for primary nodes
936     True for secondary nodes
937
938   """
939   result = _RecursiveAssembleBD(disk, owner, as_primary)
940   if isinstance(result, bdev.BlockDev):
941     result = result.dev_path
942   return result
943
944
945 def ShutdownBlockDevice(disk):
946   """Shut down a block device.
947
948   First, if the device is assembled (can `Attach()`), then the device
949   is shutdown. Then the children of the device are shutdown.
950
951   This function is called recursively. Note that we don't cache the
952   children or such, as oppossed to assemble, shutdown of different
953   devices doesn't require that the upper device was active.
954
955   """
956   r_dev = _RecursiveFindBD(disk)
957   if r_dev is not None:
958     r_path = r_dev.dev_path
959     result = r_dev.Shutdown()
960     if result:
961       DevCacheManager.RemoveCache(r_path)
962   else:
963     result = True
964   if disk.children:
965     for child in disk.children:
966       result = result and ShutdownBlockDevice(child)
967   return result
968
969
970 def MirrorAddChildren(parent_cdev, new_cdevs):
971   """Extend a mirrored block device.
972
973   """
974   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
975   if parent_bdev is None:
976     logging.error("Can't find parent device")
977     return False
978   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
979   if new_bdevs.count(None) > 0:
980     logging.error("Can't find new device(s) to add: %s:%s",
981                   new_bdevs, new_cdevs)
982     return False
983   parent_bdev.AddChildren(new_bdevs)
984   return True
985
986
987 def MirrorRemoveChildren(parent_cdev, new_cdevs):
988   """Shrink a mirrored block device.
989
990   """
991   parent_bdev = _RecursiveFindBD(parent_cdev)
992   if parent_bdev is None:
993     logging.error("Can't find parent in remove children: %s", parent_cdev)
994     return False
995   devs = []
996   for disk in new_cdevs:
997     rpath = disk.StaticDevPath()
998     if rpath is None:
999       bd = _RecursiveFindBD(disk)
1000       if bd is None:
1001         logging.error("Can't find dynamic device %s while removing children",
1002                       disk)
1003         return False
1004       else:
1005         devs.append(bd.dev_path)
1006     else:
1007       devs.append(rpath)
1008   parent_bdev.RemoveChildren(devs)
1009   return True
1010
1011
1012 def GetMirrorStatus(disks):
1013   """Get the mirroring status of a list of devices.
1014
1015   Args:
1016     disks: list of `objects.Disk`
1017
1018   Returns:
1019     list of (mirror_done, estimated_time) tuples, which
1020     are the result of bdev.BlockDevice.CombinedSyncStatus()
1021
1022   """
1023   stats = []
1024   for dsk in disks:
1025     rbd = _RecursiveFindBD(dsk)
1026     if rbd is None:
1027       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1028     stats.append(rbd.CombinedSyncStatus())
1029   return stats
1030
1031
1032 def _RecursiveFindBD(disk, allow_partial=False):
1033   """Check if a device is activated.
1034
1035   If so, return informations about the real device.
1036
1037   Args:
1038     disk: the objects.Disk instance
1039     allow_partial: don't abort the find if a child of the
1040                    device can't be found; this is intended to be
1041                    used when repairing mirrors
1042
1043   Returns:
1044     None if the device can't be found
1045     otherwise the device instance
1046
1047   """
1048   children = []
1049   if disk.children:
1050     for chdisk in disk.children:
1051       children.append(_RecursiveFindBD(chdisk))
1052
1053   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1054
1055
1056 def FindBlockDevice(disk):
1057   """Check if a device is activated.
1058
1059   If so, return informations about the real device.
1060
1061   Args:
1062     disk: the objects.Disk instance
1063   Returns:
1064     None if the device can't be found
1065     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1066
1067   """
1068   rbd = _RecursiveFindBD(disk)
1069   if rbd is None:
1070     return rbd
1071   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1072
1073
1074 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1075   """Write a file to the filesystem.
1076
1077   This allows the master to overwrite(!) a file. It will only perform
1078   the operation if the file belongs to a list of configuration files.
1079
1080   """
1081   if not os.path.isabs(file_name):
1082     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1083                   file_name)
1084     return False
1085
1086   allowed_files = [
1087     constants.CLUSTER_CONF_FILE,
1088     constants.ETC_HOSTS,
1089     constants.SSH_KNOWN_HOSTS_FILE,
1090     constants.VNC_PASSWORD_FILE,
1091     ]
1092
1093   if file_name not in allowed_files:
1094     logging.error("Filename passed to UploadFile not in allowed"
1095                  " upload targets: '%s'", file_name)
1096     return False
1097
1098   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1099                   atime=atime, mtime=mtime)
1100   return True
1101
1102
1103 def _ErrnoOrStr(err):
1104   """Format an EnvironmentError exception.
1105
1106   If the `err` argument has an errno attribute, it will be looked up
1107   and converted into a textual EXXXX description. Otherwise the string
1108   representation of the error will be returned.
1109
1110   """
1111   if hasattr(err, 'errno'):
1112     detail = errno.errorcode[err.errno]
1113   else:
1114     detail = str(err)
1115   return detail
1116
1117
1118 def _OSOndiskVersion(name, os_dir):
1119   """Compute and return the API version of a given OS.
1120
1121   This function will try to read the API version of the os given by
1122   the 'name' parameter and residing in the 'os_dir' directory.
1123
1124   Return value will be either an integer denoting the version or None in the
1125   case when this is not a valid OS name.
1126
1127   """
1128   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1129
1130   try:
1131     st = os.stat(api_file)
1132   except EnvironmentError, err:
1133     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1134                            " found (%s)" % _ErrnoOrStr(err))
1135
1136   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1137     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1138                            " a regular file")
1139
1140   try:
1141     f = open(api_file)
1142     try:
1143       api_version = f.read(256)
1144     finally:
1145       f.close()
1146   except EnvironmentError, err:
1147     raise errors.InvalidOS(name, os_dir, "error while reading the"
1148                            " API version (%s)" % _ErrnoOrStr(err))
1149
1150   api_version = api_version.strip()
1151   try:
1152     api_version = int(api_version)
1153   except (TypeError, ValueError), err:
1154     raise errors.InvalidOS(name, os_dir,
1155                            "API version is not integer (%s)" % str(err))
1156
1157   return api_version
1158
1159
1160 def DiagnoseOS(top_dirs=None):
1161   """Compute the validity for all OSes.
1162
1163   Returns an OS object for each name in all the given top directories
1164   (if not given defaults to constants.OS_SEARCH_PATH)
1165
1166   Returns:
1167     list of OS objects
1168
1169   """
1170   if top_dirs is None:
1171     top_dirs = constants.OS_SEARCH_PATH
1172
1173   result = []
1174   for dir_name in top_dirs:
1175     if os.path.isdir(dir_name):
1176       try:
1177         f_names = utils.ListVisibleFiles(dir_name)
1178       except EnvironmentError, err:
1179         logging.exception("Can't list the OS directory %s", dir_name)
1180         break
1181       for name in f_names:
1182         try:
1183           os_inst = OSFromDisk(name, base_dir=dir_name)
1184           result.append(os_inst)
1185         except errors.InvalidOS, err:
1186           result.append(objects.OS.FromInvalidOS(err))
1187
1188   return result
1189
1190
1191 def OSFromDisk(name, base_dir=None):
1192   """Create an OS instance from disk.
1193
1194   This function will return an OS instance if the given name is a
1195   valid OS name. Otherwise, it will raise an appropriate
1196   `errors.InvalidOS` exception, detailing why this is not a valid
1197   OS.
1198
1199   Args:
1200     os_dir: Directory containing the OS scripts. Defaults to a search
1201             in all the OS_SEARCH_PATH directories.
1202
1203   """
1204
1205   if base_dir is None:
1206     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1207     if os_dir is None:
1208       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1209   else:
1210     os_dir = os.path.sep.join([base_dir, name])
1211
1212   api_version = _OSOndiskVersion(name, os_dir)
1213
1214   if api_version != constants.OS_API_VERSION:
1215     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1216                            " (found %s want %s)"
1217                            % (api_version, constants.OS_API_VERSION))
1218
1219   # OS Scripts dictionary, we will populate it with the actual script names
1220   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1221
1222   for script in os_scripts:
1223     os_scripts[script] = os.path.sep.join([os_dir, script])
1224
1225     try:
1226       st = os.stat(os_scripts[script])
1227     except EnvironmentError, err:
1228       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1229                              (script, _ErrnoOrStr(err)))
1230
1231     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1232       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1233                              script)
1234
1235     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1236       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1237                              script)
1238
1239
1240   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1241                     create_script=os_scripts['create'],
1242                     export_script=os_scripts['export'],
1243                     import_script=os_scripts['import'],
1244                     rename_script=os_scripts['rename'],
1245                     api_version=api_version)
1246
1247
1248 def GrowBlockDevice(disk, amount):
1249   """Grow a stack of block devices.
1250
1251   This function is called recursively, with the childrens being the
1252   first one resize.
1253
1254   Args:
1255     disk: the disk to be grown
1256
1257   Returns: a tuple of (status, result), with:
1258     status: the result (true/false) of the operation
1259     result: the error message if the operation failed, otherwise not used
1260
1261   """
1262   r_dev = _RecursiveFindBD(disk)
1263   if r_dev is None:
1264     return False, "Cannot find block device %s" % (disk,)
1265
1266   try:
1267     r_dev.Grow(amount)
1268   except errors.BlockDeviceError, err:
1269     return False, str(err)
1270
1271   return True, None
1272
1273
1274 def SnapshotBlockDevice(disk):
1275   """Create a snapshot copy of a block device.
1276
1277   This function is called recursively, and the snapshot is actually created
1278   just for the leaf lvm backend device.
1279
1280   Args:
1281     disk: the disk to be snapshotted
1282
1283   Returns:
1284     a config entry for the actual lvm device snapshotted.
1285
1286   """
1287   if disk.children:
1288     if len(disk.children) == 1:
1289       # only one child, let's recurse on it
1290       return SnapshotBlockDevice(disk.children[0])
1291     else:
1292       # more than one child, choose one that matches
1293       for child in disk.children:
1294         if child.size == disk.size:
1295           # return implies breaking the loop
1296           return SnapshotBlockDevice(child)
1297   elif disk.dev_type == constants.LD_LV:
1298     r_dev = _RecursiveFindBD(disk)
1299     if r_dev is not None:
1300       # let's stay on the safe side and ask for the full size, for now
1301       return r_dev.Snapshot(disk.size)
1302     else:
1303       return None
1304   else:
1305     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1306                                  " '%s' of type '%s'" %
1307                                  (disk.unique_id, disk.dev_type))
1308
1309
1310 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1311   """Export a block device snapshot to a remote node.
1312
1313   Args:
1314     disk: the snapshot block device
1315     dest_node: the node to send the image to
1316     instance: instance being exported
1317
1318   Returns:
1319     True if successful, False otherwise.
1320
1321   """
1322   inst_os = OSFromDisk(instance.os)
1323   export_script = inst_os.export_script
1324
1325   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1326                                      instance.name, int(time.time()))
1327   if not os.path.exists(constants.LOG_OS_DIR):
1328     os.mkdir(constants.LOG_OS_DIR, 0750)
1329
1330   real_os_dev = _RecursiveFindBD(disk)
1331   if real_os_dev is None:
1332     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1333                                   str(disk))
1334   real_os_dev.Open()
1335
1336   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1337   destfile = disk.physical_id[1]
1338
1339   # the target command is built out of three individual commands,
1340   # which are joined by pipes; we check each individual command for
1341   # valid parameters
1342
1343   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1344                                export_script, instance.name,
1345                                real_os_dev.dev_path, logfile)
1346
1347   comprcmd = "gzip"
1348
1349   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1350                                 destdir, destdir, destfile)
1351   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1352                                                    constants.GANETI_RUNAS,
1353                                                    destcmd)
1354
1355   # all commands have been checked, so we're safe to combine them
1356   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1357
1358   result = utils.RunCmd(command)
1359
1360   if result.failed:
1361     logging.error("os snapshot export command '%s' returned error: %s"
1362                   " output: %s", command, result.fail_reason, result.output)
1363     return False
1364
1365   return True
1366
1367
1368 def FinalizeExport(instance, snap_disks):
1369   """Write out the export configuration information.
1370
1371   Args:
1372     instance: instance configuration
1373     snap_disks: snapshot block devices
1374
1375   Returns:
1376     False in case of error, True otherwise.
1377
1378   """
1379   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1380   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1381
1382   config = objects.SerializableConfigParser()
1383
1384   config.add_section(constants.INISECT_EXP)
1385   config.set(constants.INISECT_EXP, 'version', '0')
1386   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1387   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1388   config.set(constants.INISECT_EXP, 'os', instance.os)
1389   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1390
1391   config.add_section(constants.INISECT_INS)
1392   config.set(constants.INISECT_INS, 'name', instance.name)
1393   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1394   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1395   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1396
1397   nic_count = 0
1398   for nic_count, nic in enumerate(instance.nics):
1399     config.set(constants.INISECT_INS, 'nic%d_mac' %
1400                nic_count, '%s' % nic.mac)
1401     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1402     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1403                '%s' % nic.bridge)
1404   # TODO: redundant: on load can read nics until it doesn't exist
1405   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1406
1407   disk_count = 0
1408   for disk_count, disk in enumerate(snap_disks):
1409     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1410                ('%s' % disk.iv_name))
1411     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1412                ('%s' % disk.physical_id[1]))
1413     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1414                ('%d' % disk.size))
1415   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1416
1417   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1418   cfo = open(cff, 'w')
1419   try:
1420     config.write(cfo)
1421   finally:
1422     cfo.close()
1423
1424   shutil.rmtree(finaldestdir, True)
1425   shutil.move(destdir, finaldestdir)
1426
1427   return True
1428
1429
1430 def ExportInfo(dest):
1431   """Get export configuration information.
1432
1433   Args:
1434     dest: directory containing the export
1435
1436   Returns:
1437     A serializable config file containing the export info.
1438
1439   """
1440   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1441
1442   config = objects.SerializableConfigParser()
1443   config.read(cff)
1444
1445   if (not config.has_section(constants.INISECT_EXP) or
1446       not config.has_section(constants.INISECT_INS)):
1447     return None
1448
1449   return config
1450
1451
1452 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1453                          cluster_name):
1454   """Import an os image into an instance.
1455
1456   Args:
1457     instance: the instance object
1458     os_disk: the instance-visible name of the os device
1459     swap_disk: the instance-visible name of the swap device
1460     src_node: node holding the source image
1461     src_image: path to the source image on src_node
1462
1463   Returns:
1464     False in case of error, True otherwise.
1465
1466   """
1467   cfg = _GetConfig()
1468   inst_os = OSFromDisk(instance.os)
1469   import_script = inst_os.import_script
1470
1471   os_device = instance.FindDisk(os_disk)
1472   if os_device is None:
1473     logging.error("Can't find this device-visible name '%s'", os_disk)
1474     return False
1475
1476   swap_device = instance.FindDisk(swap_disk)
1477   if swap_device is None:
1478     logging.error("Can't find this device-visible name '%s'", swap_disk)
1479     return False
1480
1481   real_os_dev = _RecursiveFindBD(os_device)
1482   if real_os_dev is None:
1483     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1484                                   str(os_device))
1485   real_os_dev.Open()
1486
1487   real_swap_dev = _RecursiveFindBD(swap_device)
1488   if real_swap_dev is None:
1489     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1490                                   str(swap_device))
1491   real_swap_dev.Open()
1492
1493   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1494                                         instance.name, int(time.time()))
1495   if not os.path.exists(constants.LOG_OS_DIR):
1496     os.mkdir(constants.LOG_OS_DIR, 0750)
1497
1498   destcmd = utils.BuildShellCmd('cat %s', src_image)
1499   remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1500                                                    constants.GANETI_RUNAS,
1501                                                    destcmd)
1502
1503   comprcmd = "gunzip"
1504   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1505                                inst_os.path, import_script, instance.name,
1506                                real_os_dev.dev_path, real_swap_dev.dev_path,
1507                                logfile)
1508
1509   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1510   env = {'HYPERVISOR': cfg.GetHypervisorType()}
1511
1512   result = utils.RunCmd(command, env=env)
1513
1514   if result.failed:
1515     logging.error("os import command '%s' returned error: %s"
1516                   " output: %s", command, result.fail_reason, result.output)
1517     return False
1518
1519   return True
1520
1521
1522 def ListExports():
1523   """Return a list of exports currently available on this machine.
1524
1525   """
1526   if os.path.isdir(constants.EXPORT_DIR):
1527     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1528   else:
1529     return []
1530
1531
1532 def RemoveExport(export):
1533   """Remove an existing export from the node.
1534
1535   Args:
1536     export: the name of the export to remove
1537
1538   Returns:
1539     False in case of error, True otherwise.
1540
1541   """
1542   target = os.path.join(constants.EXPORT_DIR, export)
1543
1544   shutil.rmtree(target)
1545   # TODO: catch some of the relevant exceptions and provide a pretty
1546   # error message if rmtree fails.
1547
1548   return True
1549
1550
1551 def RenameBlockDevices(devlist):
1552   """Rename a list of block devices.
1553
1554   The devlist argument is a list of tuples (disk, new_logical,
1555   new_physical). The return value will be a combined boolean result
1556   (True only if all renames succeeded).
1557
1558   """
1559   result = True
1560   for disk, unique_id in devlist:
1561     dev = _RecursiveFindBD(disk)
1562     if dev is None:
1563       result = False
1564       continue
1565     try:
1566       old_rpath = dev.dev_path
1567       dev.Rename(unique_id)
1568       new_rpath = dev.dev_path
1569       if old_rpath != new_rpath:
1570         DevCacheManager.RemoveCache(old_rpath)
1571         # FIXME: we should add the new cache information here, like:
1572         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1573         # but we don't have the owner here - maybe parse from existing
1574         # cache? for now, we only lose lvm data when we rename, which
1575         # is less critical than DRBD or MD
1576     except errors.BlockDeviceError, err:
1577       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1578       result = False
1579   return result
1580
1581
1582 def _TransformFileStorageDir(file_storage_dir):
1583   """Checks whether given file_storage_dir is valid.
1584
1585   Checks wheter the given file_storage_dir is within the cluster-wide
1586   default file_storage_dir stored in SimpleStore. Only paths under that
1587   directory are allowed.
1588
1589   Args:
1590     file_storage_dir: string with path
1591
1592   Returns:
1593     normalized file_storage_dir (string) if valid, None otherwise
1594
1595   """
1596   cfg = _GetConfig()
1597   file_storage_dir = os.path.normpath(file_storage_dir)
1598   base_file_storage_dir = cfg.GetFileStorageDir()
1599   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1600       base_file_storage_dir):
1601     logging.error("file storage directory '%s' is not under base file"
1602                   " storage directory '%s'",
1603                   file_storage_dir, base_file_storage_dir)
1604     return None
1605   return file_storage_dir
1606
1607
1608 def CreateFileStorageDir(file_storage_dir):
1609   """Create file storage directory.
1610
1611   Args:
1612     file_storage_dir: string containing the path
1613
1614   Returns:
1615     tuple with first element a boolean indicating wheter dir
1616     creation was successful or not
1617
1618   """
1619   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1620   result = True,
1621   if not file_storage_dir:
1622     result = False,
1623   else:
1624     if os.path.exists(file_storage_dir):
1625       if not os.path.isdir(file_storage_dir):
1626         logging.error("'%s' is not a directory", file_storage_dir)
1627         result = False,
1628     else:
1629       try:
1630         os.makedirs(file_storage_dir, 0750)
1631       except OSError, err:
1632         logging.error("Cannot create file storage directory '%s': %s",
1633                       file_storage_dir, err)
1634         result = False,
1635   return result
1636
1637
1638 def RemoveFileStorageDir(file_storage_dir):
1639   """Remove file storage directory.
1640
1641   Remove it only if it's empty. If not log an error and return.
1642
1643   Args:
1644     file_storage_dir: string containing the path
1645
1646   Returns:
1647     tuple with first element a boolean indicating wheter dir
1648     removal was successful or not
1649
1650   """
1651   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1652   result = True,
1653   if not file_storage_dir:
1654     result = False,
1655   else:
1656     if os.path.exists(file_storage_dir):
1657       if not os.path.isdir(file_storage_dir):
1658         logging.error("'%s' is not a directory", file_storage_dir)
1659         result = False,
1660       # deletes dir only if empty, otherwise we want to return False
1661       try:
1662         os.rmdir(file_storage_dir)
1663       except OSError, err:
1664         logging.exception("Cannot remove file storage directory '%s'",
1665                           file_storage_dir)
1666         result = False,
1667   return result
1668
1669
1670 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1671   """Rename the file storage directory.
1672
1673   Args:
1674     old_file_storage_dir: string containing the old path
1675     new_file_storage_dir: string containing the new path
1676
1677   Returns:
1678     tuple with first element a boolean indicating wheter dir
1679     rename was successful or not
1680
1681   """
1682   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1683   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1684   result = True,
1685   if not old_file_storage_dir or not new_file_storage_dir:
1686     result = False,
1687   else:
1688     if not os.path.exists(new_file_storage_dir):
1689       if os.path.isdir(old_file_storage_dir):
1690         try:
1691           os.rename(old_file_storage_dir, new_file_storage_dir)
1692         except OSError, err:
1693           logging.exception("Cannot rename '%s' to '%s'",
1694                             old_file_storage_dir, new_file_storage_dir)
1695           result =  False,
1696       else:
1697         logging.error("'%s' is not a directory", old_file_storage_dir)
1698         result = False,
1699     else:
1700       if os.path.exists(old_file_storage_dir):
1701         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1702                       old_file_storage_dir, new_file_storage_dir)
1703         result = False,
1704   return result
1705
1706
1707 def _IsJobQueueFile(file_name):
1708   """Checks whether the given filename is in the queue directory.
1709
1710   """
1711   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1712   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1713
1714   if not result:
1715     logging.error("'%s' is not a file in the queue directory",
1716                   file_name)
1717
1718   return result
1719
1720
1721 def JobQueueUpdate(file_name, content):
1722   """Updates a file in the queue directory.
1723
1724   """
1725   if not _IsJobQueueFile(file_name):
1726     return False
1727
1728   # Write and replace the file atomically
1729   utils.WriteFile(file_name, data=content)
1730
1731   return True
1732
1733
1734 def JobQueueRename(old, new):
1735   """Renames a job queue file.
1736
1737   """
1738   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1739     return False
1740
1741   os.rename(old, new)
1742
1743   return True
1744
1745
1746 def CloseBlockDevices(disks):
1747   """Closes the given block devices.
1748
1749   This means they will be switched to secondary mode (in case of DRBD).
1750
1751   """
1752   bdevs = []
1753   for cf in disks:
1754     rd = _RecursiveFindBD(cf)
1755     if rd is None:
1756       return (False, "Can't find device %s" % cf)
1757     bdevs.append(rd)
1758
1759   msg = []
1760   for rd in bdevs:
1761     try:
1762       rd.Close()
1763     except errors.BlockDeviceError, err:
1764       msg.append(str(err))
1765   if msg:
1766     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1767   else:
1768     return (True, "All devices secondary")
1769
1770
1771 class HooksRunner(object):
1772   """Hook runner.
1773
1774   This class is instantiated on the node side (ganeti-noded) and not on
1775   the master side.
1776
1777   """
1778   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1779
1780   def __init__(self, hooks_base_dir=None):
1781     """Constructor for hooks runner.
1782
1783     Args:
1784       - hooks_base_dir: if not None, this overrides the
1785         constants.HOOKS_BASE_DIR (useful for unittests)
1786
1787     """
1788     if hooks_base_dir is None:
1789       hooks_base_dir = constants.HOOKS_BASE_DIR
1790     self._BASE_DIR = hooks_base_dir
1791
1792   @staticmethod
1793   def ExecHook(script, env):
1794     """Exec one hook script.
1795
1796     Args:
1797      - script: the full path to the script
1798      - env: the environment with which to exec the script
1799
1800     """
1801     # exec the process using subprocess and log the output
1802     fdstdin = None
1803     try:
1804       fdstdin = open("/dev/null", "r")
1805       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1806                                stderr=subprocess.STDOUT, close_fds=True,
1807                                shell=False, cwd="/", env=env)
1808       output = ""
1809       try:
1810         output = child.stdout.read(4096)
1811         child.stdout.close()
1812       except EnvironmentError, err:
1813         output += "Hook script error: %s" % str(err)
1814
1815       while True:
1816         try:
1817           result = child.wait()
1818           break
1819         except EnvironmentError, err:
1820           if err.errno == errno.EINTR:
1821             continue
1822           raise
1823     finally:
1824       # try not to leak fds
1825       for fd in (fdstdin, ):
1826         if fd is not None:
1827           try:
1828             fd.close()
1829           except EnvironmentError, err:
1830             # just log the error
1831             #logging.exception("Error while closing fd %s", fd)
1832             pass
1833
1834     return result == 0, output
1835
1836   def RunHooks(self, hpath, phase, env):
1837     """Run the scripts in the hooks directory.
1838
1839     This method will not be usually overriden by child opcodes.
1840
1841     """
1842     if phase == constants.HOOKS_PHASE_PRE:
1843       suffix = "pre"
1844     elif phase == constants.HOOKS_PHASE_POST:
1845       suffix = "post"
1846     else:
1847       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1848     rr = []
1849
1850     subdir = "%s-%s.d" % (hpath, suffix)
1851     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1852     try:
1853       dir_contents = utils.ListVisibleFiles(dir_name)
1854     except OSError, err:
1855       # must log
1856       return rr
1857
1858     # we use the standard python sort order,
1859     # so 00name is the recommended naming scheme
1860     dir_contents.sort()
1861     for relname in dir_contents:
1862       fname = os.path.join(dir_name, relname)
1863       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1864           self.RE_MASK.match(relname) is not None):
1865         rrval = constants.HKR_SKIP
1866         output = ""
1867       else:
1868         result, output = self.ExecHook(fname, env)
1869         if not result:
1870           rrval = constants.HKR_FAIL
1871         else:
1872           rrval = constants.HKR_SUCCESS
1873       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1874
1875     return rr
1876
1877
1878 class IAllocatorRunner(object):
1879   """IAllocator runner.
1880
1881   This class is instantiated on the node side (ganeti-noded) and not on
1882   the master side.
1883
1884   """
1885   def Run(self, name, idata):
1886     """Run an iallocator script.
1887
1888     Return value: tuple of:
1889        - run status (one of the IARUN_ constants)
1890        - stdout
1891        - stderr
1892        - fail reason (as from utils.RunResult)
1893
1894     """
1895     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1896                                   os.path.isfile)
1897     if alloc_script is None:
1898       return (constants.IARUN_NOTFOUND, None, None, None)
1899
1900     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1901     try:
1902       os.write(fd, idata)
1903       os.close(fd)
1904       result = utils.RunCmd([alloc_script, fin_name])
1905       if result.failed:
1906         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1907                 result.fail_reason)
1908     finally:
1909       os.unlink(fin_name)
1910
1911     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1912
1913
1914 class DevCacheManager(object):
1915   """Simple class for managing a cache of block device information.
1916
1917   """
1918   _DEV_PREFIX = "/dev/"
1919   _ROOT_DIR = constants.BDEV_CACHE_DIR
1920
1921   @classmethod
1922   def _ConvertPath(cls, dev_path):
1923     """Converts a /dev/name path to the cache file name.
1924
1925     This replaces slashes with underscores and strips the /dev
1926     prefix. It then returns the full path to the cache file
1927
1928     """
1929     if dev_path.startswith(cls._DEV_PREFIX):
1930       dev_path = dev_path[len(cls._DEV_PREFIX):]
1931     dev_path = dev_path.replace("/", "_")
1932     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1933     return fpath
1934
1935   @classmethod
1936   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1937     """Updates the cache information for a given device.
1938
1939     """
1940     if dev_path is None:
1941       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1942       return
1943     fpath = cls._ConvertPath(dev_path)
1944     if on_primary:
1945       state = "primary"
1946     else:
1947       state = "secondary"
1948     if iv_name is None:
1949       iv_name = "not_visible"
1950     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1951     try:
1952       utils.WriteFile(fpath, data=fdata)
1953     except EnvironmentError, err:
1954       logging.exception("Can't update bdev cache for %s", dev_path)
1955
1956   @classmethod
1957   def RemoveCache(cls, dev_path):
1958     """Remove data for a dev_path.
1959
1960     """
1961     if dev_path is None:
1962       logging.error("DevCacheManager.RemoveCache got a None dev_path")
1963       return
1964     fpath = cls._ConvertPath(dev_path)
1965     try:
1966       utils.RemoveFile(fpath)
1967     except EnvironmentError, err:
1968       logging.exception("Can't update bdev cache for %s", dev_path)