Fix job queue behaviour when loading jobs
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner(cluster_name):
52   return ssh.SshRunner(cluster_name)
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.OwnIpAddress(master_ip):
121       # we already have the ip:
122       logging.debug("Already started")
123     else:
124       logging.error("Someone else has the master ip, not activating")
125       ok = False
126   else:
127     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128                            "dev", master_netdev, "label",
129                            "%s:0" % master_netdev])
130     if result.failed:
131       logging.error("Can't activate master IP: %s", result.output)
132       ok = False
133
134     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135                            "-s", master_ip, master_ip])
136     # we'll ignore the exit code of arping
137
138   # and now start the master and rapi daemons
139   if start_daemons:
140     for daemon in 'ganeti-masterd', 'ganeti-rapi':
141       result = utils.RunCmd([daemon])
142       if result.failed:
143         logging.error("Can't start daemon %s: %s", daemon, result.output)
144         ok = False
145   return ok
146
147
148 def StopMaster(stop_daemons):
149   """Deactivate this node as master.
150
151   The function will always try to deactivate the IP address of the
152   master. Then, if the stop_daemons parameter is True, it will also
153   stop the master daemons (ganet-masterd and ganeti-rapi).
154
155   """
156   master_netdev, master_ip, _ = GetMasterInfo()
157   if not master_netdev:
158     return False
159
160   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161                          "dev", master_netdev])
162   if result.failed:
163     logging.error("Can't remove the master IP, error: %s", result.output)
164     # but otherwise ignore the failure
165
166   if stop_daemons:
167     # stop/kill the rapi and the master daemon
168     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170
171   return True
172
173
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175   """Joins this node to the cluster.
176
177   This does the following:
178       - updates the hostkeys of the machine (rsa and dsa)
179       - adds the ssh private key to the user
180       - adds the ssh public key to the users' authorized_keys file
181
182   """
183   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187   for name, content, mode in sshd_keys:
188     utils.WriteFile(name, data=content, mode=mode)
189
190   try:
191     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192                                                     mkdir=True)
193   except errors.OpExecError, err:
194     logging.exception("Error while processing user ssh files")
195     return False
196
197   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198     utils.WriteFile(name, data=content, mode=0600)
199
200   utils.AddAuthorizedKey(auth_keys, sshpub)
201
202   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203
204   return True
205
206
207 def LeaveCluster():
208   """Cleans up the current node and prepares it to be removed from the cluster.
209
210   """
211   _CleanDirectory(constants.DATA_DIR)
212   JobQueuePurge()
213
214   try:
215     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216   except errors.OpExecError:
217     logging.exception("Error while processing ssh files")
218     return
219
220   f = open(pub_key, 'r')
221   try:
222     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223   finally:
224     f.close()
225
226   utils.RemoveFile(priv_key)
227   utils.RemoveFile(pub_key)
228
229   # Return a reassuring string to the caller, and quit
230   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231
232
233 def GetNodeInfo(vgname, hypervisor_type):
234   """Gives back a hash with different informations about the node.
235
236   @type vgname: C{string}
237   @param vgname: the name of the volume group to ask for disk space information
238   @type hypervisor_type: C{str}
239   @param hypervisor_type: the name of the hypervisor to ask for
240       memory information
241   @rtype: C{dict}
242   @return: dictionary with the following keys:
243       - vg_size is the size of the configured volume group in MiB
244       - vg_free is the free size of the volume group in MiB
245       - memory_dom0 is the memory allocated for domain0 in MiB
246       - memory_free is the currently available (free) ram in MiB
247       - memory_total is the total number of ram in MiB
248
249   """
250   outputarray = {}
251   vginfo = _GetVGInfo(vgname)
252   outputarray['vg_size'] = vginfo['vg_size']
253   outputarray['vg_free'] = vginfo['vg_free']
254
255   hyper = hypervisor.GetHypervisor(hypervisor_type)
256   hyp_info = hyper.GetNodeInfo()
257   if hyp_info is not None:
258     outputarray.update(hyp_info)
259
260   f = open("/proc/sys/kernel/random/boot_id", 'r')
261   try:
262     outputarray["bootid"] = f.read(128).rstrip("\n")
263   finally:
264     f.close()
265
266   return outputarray
267
268
269 def VerifyNode(what, cluster_name):
270   """Verify the status of the local node.
271
272   Based on the input L{what} parameter, various checks are done on the
273   local node.
274
275   If the I{filelist} key is present, this list of
276   files is checksummed and the file/checksum pairs are returned.
277
278   If the I{nodelist} key is present, we check that we have
279   connectivity via ssh with the target nodes (and check the hostname
280   report).
281
282   If the I{node-net-test} key is present, we check that we have
283   connectivity to the given nodes via both primary IP and, if
284   applicable, secondary IPs.
285
286   @type what: C{dict}
287   @param what: a dictionary of things to check:
288       - filelist: list of files for which to compute checksums
289       - nodelist: list of nodes we should check ssh communication with
290       - node-net-test: list of nodes we should check node daemon port
291         connectivity with
292       - hypervisor: list with hypervisors to run the verify for
293
294   """
295   result = {}
296
297   if 'hypervisor' in what:
298     result['hypervisor'] = my_dict = {}
299     for hv_name in what['hypervisor']:
300       my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301
302   if 'filelist' in what:
303     result['filelist'] = utils.FingerprintFiles(what['filelist'])
304
305   if 'nodelist' in what:
306     result['nodelist'] = {}
307     random.shuffle(what['nodelist'])
308     for node in what['nodelist']:
309       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310       if not success:
311         result['nodelist'][node] = message
312   if 'node-net-test' in what:
313     result['node-net-test'] = {}
314     my_name = utils.HostInfo().name
315     my_pip = my_sip = None
316     for name, pip, sip in what['node-net-test']:
317       if name == my_name:
318         my_pip = pip
319         my_sip = sip
320         break
321     if not my_pip:
322       result['node-net-test'][my_name] = ("Can't find my own"
323                                           " primary/secondary IP"
324                                           " in the node list")
325     else:
326       port = utils.GetNodeDaemonPort()
327       for name, pip, sip in what['node-net-test']:
328         fail = []
329         if not utils.TcpPing(pip, port, source=my_pip):
330           fail.append("primary")
331         if sip != pip:
332           if not utils.TcpPing(sip, port, source=my_sip):
333             fail.append("secondary")
334         if fail:
335           result['node-net-test'][name] = ("failure using the %s"
336                                            " interface(s)" %
337                                            " and ".join(fail))
338
339   return result
340
341
342 def GetVolumeList(vg_name):
343   """Compute list of logical volumes and their size.
344
345   Returns:
346     dictionary of all partions (key) with their size (in MiB), inactive
347     and online status:
348     {'test1': ('20.06', True, True)}
349
350   """
351   lvs = {}
352   sep = '|'
353   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354                          "--separator=%s" % sep,
355                          "-olv_name,lv_size,lv_attr", vg_name])
356   if result.failed:
357     logging.error("Failed to list logical volumes, lvs output: %s",
358                   result.output)
359     return result.output
360
361   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362   for line in result.stdout.splitlines():
363     line = line.strip()
364     match = valid_line_re.match(line)
365     if not match:
366       logging.error("Invalid line returned from lvs output: '%s'", line)
367       continue
368     name, size, attr = match.groups()
369     inactive = attr[4] == '-'
370     online = attr[5] == 'o'
371     lvs[name] = (size, inactive, online)
372
373   return lvs
374
375
376 def ListVolumeGroups():
377   """List the volume groups and their size.
378
379   Returns:
380     Dictionary with keys volume name and values the size of the volume
381
382   """
383   return utils.ListVolumeGroups()
384
385
386 def NodeVolumes():
387   """List all volumes on this node.
388
389   """
390   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391                          "--separator=|",
392                          "--options=lv_name,lv_size,devices,vg_name"])
393   if result.failed:
394     logging.error("Failed to list logical volumes, lvs output: %s",
395                   result.output)
396     return {}
397
398   def parse_dev(dev):
399     if '(' in dev:
400       return dev.split('(')[0]
401     else:
402       return dev
403
404   def map_line(line):
405     return {
406       'name': line[0].strip(),
407       'size': line[1].strip(),
408       'dev': parse_dev(line[2].strip()),
409       'vg': line[3].strip(),
410     }
411
412   return [map_line(line.split('|')) for line in result.stdout.splitlines()
413           if line.count('|') >= 3]
414
415
416 def BridgesExist(bridges_list):
417   """Check if a list of bridges exist on the current node.
418
419   Returns:
420     True if all of them exist, false otherwise
421
422   """
423   for bridge in bridges_list:
424     if not utils.BridgeExists(bridge):
425       return False
426
427   return True
428
429
430 def GetInstanceList(hypervisor_list):
431   """Provides a list of instances.
432
433   @type hypervisor_list: list
434   @param hypervisor_list: the list of hypervisors to query information
435
436   @rtype: list
437   @return: a list of all running instances on the current node
438              - instance1.example.com
439              - instance2.example.com
440
441   """
442   results = []
443   for hname in hypervisor_list:
444     try:
445       names = hypervisor.GetHypervisor(hname).ListInstances()
446       results.extend(names)
447     except errors.HypervisorError, err:
448       logging.exception("Error enumerating instances for hypevisor %s", hname)
449       # FIXME: should we somehow not propagate this to the master?
450       raise
451
452   return results
453
454
455 def GetInstanceInfo(instance, hname):
456   """Gives back the informations about an instance as a dictionary.
457
458   @type instance: string
459   @param instance: the instance name
460   @type hname: string
461   @param hname: the hypervisor type of the instance
462
463   @rtype: dict
464   @return: dictionary with the following keys:
465       - memory: memory size of instance (int)
466       - state: xen state of instance (string)
467       - time: cpu time of instance (float)
468
469   """
470   output = {}
471
472   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473   if iinfo is not None:
474     output['memory'] = iinfo[2]
475     output['state'] = iinfo[4]
476     output['time'] = iinfo[5]
477
478   return output
479
480
481 def GetAllInstancesInfo(hypervisor_list):
482   """Gather data about all instances.
483
484   This is the equivalent of `GetInstanceInfo()`, except that it
485   computes data for all instances at once, thus being faster if one
486   needs data about more than one instance.
487
488   @type hypervisor_list: list
489   @param hypervisor_list: list of hypervisors to query for instance data
490
491   @rtype: dict of dicts
492   @return: dictionary of instance: data, with data having the following keys:
493       - memory: memory size of instance (int)
494       - state: xen state of instance (string)
495       - time: cpu time of instance (float)
496       - vcpuus: the number of vcpus
497
498   """
499   output = {}
500
501   for hname in hypervisor_list:
502     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503     if iinfo:
504       for name, inst_id, memory, vcpus, state, times in iinfo:
505         value = {
506           'memory': memory,
507           'vcpus': vcpus,
508           'state': state,
509           'time': times,
510           }
511         if name in output and output[name] != value:
512           raise errors.HypervisorError("Instance %s running duplicate"
513                                        " with different parameters" % name)
514         output[name] = value
515
516   return output
517
518
519 def AddOSToInstance(instance, os_disk, swap_disk):
520   """Add an OS to an instance.
521
522   Args:
523     instance: the instance object
524     os_disk: the instance-visible name of the os device
525     swap_disk: the instance-visible name of the swap device
526
527   """
528   inst_os = OSFromDisk(instance.os)
529
530   create_script = inst_os.create_script
531
532   os_device = instance.FindDisk(os_disk)
533   if os_device is None:
534     logging.error("Can't find this device-visible name '%s'", os_disk)
535     return False
536
537   swap_device = instance.FindDisk(swap_disk)
538   if swap_device is None:
539     logging.error("Can't find this device-visible name '%s'", swap_disk)
540     return False
541
542   real_os_dev = _RecursiveFindBD(os_device)
543   if real_os_dev is None:
544     raise errors.BlockDeviceError("Block device '%s' is not set up" %
545                                   str(os_device))
546   real_os_dev.Open()
547
548   real_swap_dev = _RecursiveFindBD(swap_device)
549   if real_swap_dev is None:
550     raise errors.BlockDeviceError("Block device '%s' is not set up" %
551                                   str(swap_device))
552   real_swap_dev.Open()
553
554   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555                                      instance.name, int(time.time()))
556   if not os.path.exists(constants.LOG_OS_DIR):
557     os.mkdir(constants.LOG_OS_DIR, 0750)
558
559   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
560                                 inst_os.path, create_script, instance.name,
561                                 real_os_dev.dev_path, real_swap_dev.dev_path,
562                                 logfile)
563   env = {'HYPERVISOR': instance.hypervisor}
564
565   result = utils.RunCmd(command, env=env)
566   if result.failed:
567     logging.error("os create command '%s' returned error: %s, logfile: %s,"
568                   " output: %s", command, result.fail_reason, logfile,
569                   result.output)
570     return False
571
572   return True
573
574
575 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
576   """Run the OS rename script for an instance.
577
578   Args:
579     instance: the instance object
580     old_name: the old name of the instance
581     os_disk: the instance-visible name of the os device
582     swap_disk: the instance-visible name of the swap device
583
584   """
585   inst_os = OSFromDisk(instance.os)
586
587   script = inst_os.rename_script
588
589   os_device = instance.FindDisk(os_disk)
590   if os_device is None:
591     logging.error("Can't find this device-visible name '%s'", os_disk)
592     return False
593
594   swap_device = instance.FindDisk(swap_disk)
595   if swap_device is None:
596     logging.error("Can't find this device-visible name '%s'", swap_disk)
597     return False
598
599   real_os_dev = _RecursiveFindBD(os_device)
600   if real_os_dev is None:
601     raise errors.BlockDeviceError("Block device '%s' is not set up" %
602                                   str(os_device))
603   real_os_dev.Open()
604
605   real_swap_dev = _RecursiveFindBD(swap_device)
606   if real_swap_dev is None:
607     raise errors.BlockDeviceError("Block device '%s' is not set up" %
608                                   str(swap_device))
609   real_swap_dev.Open()
610
611   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
612                                            old_name,
613                                            instance.name, int(time.time()))
614   if not os.path.exists(constants.LOG_OS_DIR):
615     os.mkdir(constants.LOG_OS_DIR, 0750)
616
617   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
618                                 inst_os.path, script, old_name, instance.name,
619                                 real_os_dev.dev_path, real_swap_dev.dev_path,
620                                 logfile)
621
622   result = utils.RunCmd(command)
623
624   if result.failed:
625     logging.error("os create command '%s' returned error: %s output: %s",
626                   command, result.fail_reason, result.output)
627     return False
628
629   return True
630
631
632 def _GetVGInfo(vg_name):
633   """Get informations about the volume group.
634
635   Args:
636     vg_name: the volume group
637
638   Returns:
639     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
640     where
641     vg_size is the total size of the volume group in MiB
642     vg_free is the free size of the volume group in MiB
643     pv_count are the number of physical disks in that vg
644
645   If an error occurs during gathering of data, we return the same dict
646   with keys all set to None.
647
648   """
649   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
650
651   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
652                          "--nosuffix", "--units=m", "--separator=:", vg_name])
653
654   if retval.failed:
655     logging.error("volume group %s not present", vg_name)
656     return retdic
657   valarr = retval.stdout.strip().rstrip(':').split(':')
658   if len(valarr) == 3:
659     try:
660       retdic = {
661         "vg_size": int(round(float(valarr[0]), 0)),
662         "vg_free": int(round(float(valarr[1]), 0)),
663         "pv_count": int(valarr[2]),
664         }
665     except ValueError, err:
666       logging.exception("Fail to parse vgs output")
667   else:
668     logging.error("vgs output has the wrong number of fields (expected"
669                   " three): %s", str(valarr))
670   return retdic
671
672
673 def _GatherBlockDevs(instance):
674   """Set up an instance's block device(s).
675
676   This is run on the primary node at instance startup. The block
677   devices must be already assembled.
678
679   """
680   block_devices = []
681   for disk in instance.disks:
682     device = _RecursiveFindBD(disk)
683     if device is None:
684       raise errors.BlockDeviceError("Block device '%s' is not set up." %
685                                     str(disk))
686     device.Open()
687     block_devices.append((disk, device))
688   return block_devices
689
690
691 def StartInstance(instance, extra_args):
692   """Start an instance.
693
694   @type instance: instance object
695   @param instance: the instance object
696   @rtype: boolean
697   @return: whether the startup was successful or not
698
699   """
700   running_instances = GetInstanceList([instance.hypervisor])
701
702   if instance.name in running_instances:
703     return True
704
705   block_devices = _GatherBlockDevs(instance)
706   hyper = hypervisor.GetHypervisor(instance.hypervisor)
707
708   try:
709     hyper.StartInstance(instance, block_devices, extra_args)
710   except errors.HypervisorError, err:
711     logging.exception("Failed to start instance")
712     return False
713
714   return True
715
716
717 def ShutdownInstance(instance):
718   """Shut an instance down.
719
720   @type instance: instance object
721   @param instance: the instance object
722   @rtype: boolean
723   @return: whether the startup was successful or not
724
725   """
726   hv_name = instance.hypervisor
727   running_instances = GetInstanceList([hv_name])
728
729   if instance.name not in running_instances:
730     return True
731
732   hyper = hypervisor.GetHypervisor(hv_name)
733   try:
734     hyper.StopInstance(instance)
735   except errors.HypervisorError, err:
736     logging.error("Failed to stop instance")
737     return False
738
739   # test every 10secs for 2min
740   shutdown_ok = False
741
742   time.sleep(1)
743   for dummy in range(11):
744     if instance.name not in GetInstanceList([hv_name]):
745       break
746     time.sleep(10)
747   else:
748     # the shutdown did not succeed
749     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
750
751     try:
752       hyper.StopInstance(instance, force=True)
753     except errors.HypervisorError, err:
754       logging.exception("Failed to stop instance")
755       return False
756
757     time.sleep(1)
758     if instance.name in GetInstanceList([hv_name]):
759       logging.error("could not shutdown instance '%s' even by destroy",
760                     instance.name)
761       return False
762
763   return True
764
765
766 def RebootInstance(instance, reboot_type, extra_args):
767   """Reboot an instance.
768
769   Args:
770     instance    - name of instance to reboot
771     reboot_type - how to reboot [soft,hard,full]
772
773   """
774   running_instances = GetInstanceList([instance.hypervisor])
775
776   if instance.name not in running_instances:
777     logging.error("Cannot reboot instance that is not running")
778     return False
779
780   hyper = hypervisor.GetHypervisor(instance.hypervisor)
781   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
782     try:
783       hyper.RebootInstance(instance)
784     except errors.HypervisorError, err:
785       logging.exception("Failed to soft reboot instance")
786       return False
787   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
788     try:
789       ShutdownInstance(instance)
790       StartInstance(instance, extra_args)
791     except errors.HypervisorError, err:
792       logging.exception("Failed to hard reboot instance")
793       return False
794   else:
795     raise errors.ParameterError("reboot_type invalid")
796
797   return True
798
799
800 def MigrateInstance(instance, target, live):
801   """Migrates an instance to another node.
802
803   @type instance: C{objects.Instance}
804   @param instance: the instance definition
805   @type target: string
806   @param target: the target node name
807   @type live: boolean
808   @param live: whether the migration should be done live or not (the
809       interpretation of this parameter is left to the hypervisor)
810   @rtype: tuple
811   @return: a tuple of (success, msg) where:
812       - succes is a boolean denoting the success/failure of the operation
813       - msg is a string with details in case of failure
814
815   """
816   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
817
818   try:
819     hyper.MigrateInstance(instance.name, target, live)
820   except errors.HypervisorError, err:
821     msg = "Failed to migrate instance: %s" % str(err)
822     logging.error(msg)
823     return (False, msg)
824   return (True, "Migration successfull")
825
826
827 def CreateBlockDevice(disk, size, owner, on_primary, info):
828   """Creates a block device for an instance.
829
830   Args:
831    disk: a ganeti.objects.Disk object
832    size: the size of the physical underlying device
833    owner: a string with the name of the instance
834    on_primary: a boolean indicating if it is the primary node or not
835    info: string that will be sent to the physical device creation
836
837   Returns:
838     the new unique_id of the device (this can sometime be
839     computed only after creation), or None. On secondary nodes,
840     it's not required to return anything.
841
842   """
843   clist = []
844   if disk.children:
845     for child in disk.children:
846       crdev = _RecursiveAssembleBD(child, owner, on_primary)
847       if on_primary or disk.AssembleOnSecondary():
848         # we need the children open in case the device itself has to
849         # be assembled
850         crdev.Open()
851       clist.append(crdev)
852   try:
853     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
854     if device is not None:
855       logging.info("removing existing device %s", disk)
856       device.Remove()
857   except errors.BlockDeviceError, err:
858     pass
859
860   device = bdev.Create(disk.dev_type, disk.physical_id,
861                        clist, size)
862   if device is None:
863     raise ValueError("Can't create child device for %s, %s" %
864                      (disk, size))
865   if on_primary or disk.AssembleOnSecondary():
866     if not device.Assemble():
867       errorstring = "Can't assemble device after creation"
868       logging.error(errorstring)
869       raise errors.BlockDeviceError("%s, very unusual event - check the node"
870                                     " daemon logs" % errorstring)
871     device.SetSyncSpeed(constants.SYNC_SPEED)
872     if on_primary or disk.OpenOnSecondary():
873       device.Open(force=True)
874     DevCacheManager.UpdateCache(device.dev_path, owner,
875                                 on_primary, disk.iv_name)
876
877   device.SetInfo(info)
878
879   physical_id = device.unique_id
880   return physical_id
881
882
883 def RemoveBlockDevice(disk):
884   """Remove a block device.
885
886   This is intended to be called recursively.
887
888   """
889   try:
890     # since we are removing the device, allow a partial match
891     # this allows removal of broken mirrors
892     rdev = _RecursiveFindBD(disk, allow_partial=True)
893   except errors.BlockDeviceError, err:
894     # probably can't attach
895     logging.info("Can't attach to device %s in remove", disk)
896     rdev = None
897   if rdev is not None:
898     r_path = rdev.dev_path
899     result = rdev.Remove()
900     if result:
901       DevCacheManager.RemoveCache(r_path)
902   else:
903     result = True
904   if disk.children:
905     for child in disk.children:
906       result = result and RemoveBlockDevice(child)
907   return result
908
909
910 def _RecursiveAssembleBD(disk, owner, as_primary):
911   """Activate a block device for an instance.
912
913   This is run on the primary and secondary nodes for an instance.
914
915   This function is called recursively.
916
917   Args:
918     disk: a objects.Disk object
919     as_primary: if we should make the block device read/write
920
921   Returns:
922     the assembled device or None (in case no device was assembled)
923
924   If the assembly is not successful, an exception is raised.
925
926   """
927   children = []
928   if disk.children:
929     mcn = disk.ChildrenNeeded()
930     if mcn == -1:
931       mcn = 0 # max number of Nones allowed
932     else:
933       mcn = len(disk.children) - mcn # max number of Nones
934     for chld_disk in disk.children:
935       try:
936         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
937       except errors.BlockDeviceError, err:
938         if children.count(None) >= mcn:
939           raise
940         cdev = None
941         logging.debug("Error in child activation: %s", str(err))
942       children.append(cdev)
943
944   if as_primary or disk.AssembleOnSecondary():
945     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
946     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
947     result = r_dev
948     if as_primary or disk.OpenOnSecondary():
949       r_dev.Open()
950     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
951                                 as_primary, disk.iv_name)
952
953   else:
954     result = True
955   return result
956
957
958 def AssembleBlockDevice(disk, owner, as_primary):
959   """Activate a block device for an instance.
960
961   This is a wrapper over _RecursiveAssembleBD.
962
963   Returns:
964     a /dev path for primary nodes
965     True for secondary nodes
966
967   """
968   result = _RecursiveAssembleBD(disk, owner, as_primary)
969   if isinstance(result, bdev.BlockDev):
970     result = result.dev_path
971   return result
972
973
974 def ShutdownBlockDevice(disk):
975   """Shut down a block device.
976
977   First, if the device is assembled (can `Attach()`), then the device
978   is shutdown. Then the children of the device are shutdown.
979
980   This function is called recursively. Note that we don't cache the
981   children or such, as oppossed to assemble, shutdown of different
982   devices doesn't require that the upper device was active.
983
984   """
985   r_dev = _RecursiveFindBD(disk)
986   if r_dev is not None:
987     r_path = r_dev.dev_path
988     result = r_dev.Shutdown()
989     if result:
990       DevCacheManager.RemoveCache(r_path)
991   else:
992     result = True
993   if disk.children:
994     for child in disk.children:
995       result = result and ShutdownBlockDevice(child)
996   return result
997
998
999 def MirrorAddChildren(parent_cdev, new_cdevs):
1000   """Extend a mirrored block device.
1001
1002   """
1003   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1004   if parent_bdev is None:
1005     logging.error("Can't find parent device")
1006     return False
1007   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1008   if new_bdevs.count(None) > 0:
1009     logging.error("Can't find new device(s) to add: %s:%s",
1010                   new_bdevs, new_cdevs)
1011     return False
1012   parent_bdev.AddChildren(new_bdevs)
1013   return True
1014
1015
1016 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1017   """Shrink a mirrored block device.
1018
1019   """
1020   parent_bdev = _RecursiveFindBD(parent_cdev)
1021   if parent_bdev is None:
1022     logging.error("Can't find parent in remove children: %s", parent_cdev)
1023     return False
1024   devs = []
1025   for disk in new_cdevs:
1026     rpath = disk.StaticDevPath()
1027     if rpath is None:
1028       bd = _RecursiveFindBD(disk)
1029       if bd is None:
1030         logging.error("Can't find dynamic device %s while removing children",
1031                       disk)
1032         return False
1033       else:
1034         devs.append(bd.dev_path)
1035     else:
1036       devs.append(rpath)
1037   parent_bdev.RemoveChildren(devs)
1038   return True
1039
1040
1041 def GetMirrorStatus(disks):
1042   """Get the mirroring status of a list of devices.
1043
1044   Args:
1045     disks: list of `objects.Disk`
1046
1047   Returns:
1048     list of (mirror_done, estimated_time) tuples, which
1049     are the result of bdev.BlockDevice.CombinedSyncStatus()
1050
1051   """
1052   stats = []
1053   for dsk in disks:
1054     rbd = _RecursiveFindBD(dsk)
1055     if rbd is None:
1056       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1057     stats.append(rbd.CombinedSyncStatus())
1058   return stats
1059
1060
1061 def _RecursiveFindBD(disk, allow_partial=False):
1062   """Check if a device is activated.
1063
1064   If so, return informations about the real device.
1065
1066   Args:
1067     disk: the objects.Disk instance
1068     allow_partial: don't abort the find if a child of the
1069                    device can't be found; this is intended to be
1070                    used when repairing mirrors
1071
1072   Returns:
1073     None if the device can't be found
1074     otherwise the device instance
1075
1076   """
1077   children = []
1078   if disk.children:
1079     for chdisk in disk.children:
1080       children.append(_RecursiveFindBD(chdisk))
1081
1082   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1083
1084
1085 def FindBlockDevice(disk):
1086   """Check if a device is activated.
1087
1088   If so, return informations about the real device.
1089
1090   Args:
1091     disk: the objects.Disk instance
1092   Returns:
1093     None if the device can't be found
1094     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1095
1096   """
1097   rbd = _RecursiveFindBD(disk)
1098   if rbd is None:
1099     return rbd
1100   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1101
1102
1103 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1104   """Write a file to the filesystem.
1105
1106   This allows the master to overwrite(!) a file. It will only perform
1107   the operation if the file belongs to a list of configuration files.
1108
1109   """
1110   if not os.path.isabs(file_name):
1111     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1112                   file_name)
1113     return False
1114
1115   allowed_files = [
1116     constants.CLUSTER_CONF_FILE,
1117     constants.ETC_HOSTS,
1118     constants.SSH_KNOWN_HOSTS_FILE,
1119     constants.VNC_PASSWORD_FILE,
1120     ]
1121
1122   if file_name not in allowed_files:
1123     logging.error("Filename passed to UploadFile not in allowed"
1124                  " upload targets: '%s'", file_name)
1125     return False
1126
1127   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1128                   atime=atime, mtime=mtime)
1129   return True
1130
1131
1132 def _ErrnoOrStr(err):
1133   """Format an EnvironmentError exception.
1134
1135   If the `err` argument has an errno attribute, it will be looked up
1136   and converted into a textual EXXXX description. Otherwise the string
1137   representation of the error will be returned.
1138
1139   """
1140   if hasattr(err, 'errno'):
1141     detail = errno.errorcode[err.errno]
1142   else:
1143     detail = str(err)
1144   return detail
1145
1146
1147 def _OSOndiskVersion(name, os_dir):
1148   """Compute and return the API version of a given OS.
1149
1150   This function will try to read the API version of the os given by
1151   the 'name' parameter and residing in the 'os_dir' directory.
1152
1153   Return value will be either an integer denoting the version or None in the
1154   case when this is not a valid OS name.
1155
1156   """
1157   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1158
1159   try:
1160     st = os.stat(api_file)
1161   except EnvironmentError, err:
1162     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1163                            " found (%s)" % _ErrnoOrStr(err))
1164
1165   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1166     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1167                            " a regular file")
1168
1169   try:
1170     f = open(api_file)
1171     try:
1172       api_versions = f.readlines()
1173     finally:
1174       f.close()
1175   except EnvironmentError, err:
1176     raise errors.InvalidOS(name, os_dir, "error while reading the"
1177                            " API version (%s)" % _ErrnoOrStr(err))
1178
1179   api_versions = [version.strip() for version in api_versions]
1180   try:
1181     api_versions = [int(version) for version in api_versions]
1182   except (TypeError, ValueError), err:
1183     raise errors.InvalidOS(name, os_dir,
1184                            "API version is not integer (%s)" % str(err))
1185
1186   return api_versions
1187
1188
1189 def DiagnoseOS(top_dirs=None):
1190   """Compute the validity for all OSes.
1191
1192   Returns an OS object for each name in all the given top directories
1193   (if not given defaults to constants.OS_SEARCH_PATH)
1194
1195   Returns:
1196     list of OS objects
1197
1198   """
1199   if top_dirs is None:
1200     top_dirs = constants.OS_SEARCH_PATH
1201
1202   result = []
1203   for dir_name in top_dirs:
1204     if os.path.isdir(dir_name):
1205       try:
1206         f_names = utils.ListVisibleFiles(dir_name)
1207       except EnvironmentError, err:
1208         logging.exception("Can't list the OS directory %s", dir_name)
1209         break
1210       for name in f_names:
1211         try:
1212           os_inst = OSFromDisk(name, base_dir=dir_name)
1213           result.append(os_inst)
1214         except errors.InvalidOS, err:
1215           result.append(objects.OS.FromInvalidOS(err))
1216
1217   return result
1218
1219
1220 def OSFromDisk(name, base_dir=None):
1221   """Create an OS instance from disk.
1222
1223   This function will return an OS instance if the given name is a
1224   valid OS name. Otherwise, it will raise an appropriate
1225   `errors.InvalidOS` exception, detailing why this is not a valid
1226   OS.
1227
1228   Args:
1229     os_dir: Directory containing the OS scripts. Defaults to a search
1230             in all the OS_SEARCH_PATH directories.
1231
1232   """
1233
1234   if base_dir is None:
1235     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1236     if os_dir is None:
1237       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1238   else:
1239     os_dir = os.path.sep.join([base_dir, name])
1240
1241   api_versions = _OSOndiskVersion(name, os_dir)
1242
1243   if constants.OS_API_VERSION not in api_versions:
1244     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1245                            " (found %s want %s)"
1246                            % (api_versions, constants.OS_API_VERSION))
1247
1248   # OS Scripts dictionary, we will populate it with the actual script names
1249   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1250
1251   for script in os_scripts:
1252     os_scripts[script] = os.path.sep.join([os_dir, script])
1253
1254     try:
1255       st = os.stat(os_scripts[script])
1256     except EnvironmentError, err:
1257       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1258                              (script, _ErrnoOrStr(err)))
1259
1260     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1261       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1262                              script)
1263
1264     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1265       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1266                              script)
1267
1268
1269   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1270                     create_script=os_scripts['create'],
1271                     export_script=os_scripts['export'],
1272                     import_script=os_scripts['import'],
1273                     rename_script=os_scripts['rename'],
1274                     api_versions=api_versions)
1275
1276
1277 def GrowBlockDevice(disk, amount):
1278   """Grow a stack of block devices.
1279
1280   This function is called recursively, with the childrens being the
1281   first one resize.
1282
1283   Args:
1284     disk: the disk to be grown
1285
1286   Returns: a tuple of (status, result), with:
1287     status: the result (true/false) of the operation
1288     result: the error message if the operation failed, otherwise not used
1289
1290   """
1291   r_dev = _RecursiveFindBD(disk)
1292   if r_dev is None:
1293     return False, "Cannot find block device %s" % (disk,)
1294
1295   try:
1296     r_dev.Grow(amount)
1297   except errors.BlockDeviceError, err:
1298     return False, str(err)
1299
1300   return True, None
1301
1302
1303 def SnapshotBlockDevice(disk):
1304   """Create a snapshot copy of a block device.
1305
1306   This function is called recursively, and the snapshot is actually created
1307   just for the leaf lvm backend device.
1308
1309   Args:
1310     disk: the disk to be snapshotted
1311
1312   Returns:
1313     a config entry for the actual lvm device snapshotted.
1314
1315   """
1316   if disk.children:
1317     if len(disk.children) == 1:
1318       # only one child, let's recurse on it
1319       return SnapshotBlockDevice(disk.children[0])
1320     else:
1321       # more than one child, choose one that matches
1322       for child in disk.children:
1323         if child.size == disk.size:
1324           # return implies breaking the loop
1325           return SnapshotBlockDevice(child)
1326   elif disk.dev_type == constants.LD_LV:
1327     r_dev = _RecursiveFindBD(disk)
1328     if r_dev is not None:
1329       # let's stay on the safe side and ask for the full size, for now
1330       return r_dev.Snapshot(disk.size)
1331     else:
1332       return None
1333   else:
1334     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1335                                  " '%s' of type '%s'" %
1336                                  (disk.unique_id, disk.dev_type))
1337
1338
1339 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1340   """Export a block device snapshot to a remote node.
1341
1342   Args:
1343     disk: the snapshot block device
1344     dest_node: the node to send the image to
1345     instance: instance being exported
1346
1347   Returns:
1348     True if successful, False otherwise.
1349
1350   """
1351   inst_os = OSFromDisk(instance.os)
1352   export_script = inst_os.export_script
1353
1354   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1355                                      instance.name, int(time.time()))
1356   if not os.path.exists(constants.LOG_OS_DIR):
1357     os.mkdir(constants.LOG_OS_DIR, 0750)
1358
1359   real_os_dev = _RecursiveFindBD(disk)
1360   if real_os_dev is None:
1361     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1362                                   str(disk))
1363   real_os_dev.Open()
1364
1365   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1366   destfile = disk.physical_id[1]
1367
1368   # the target command is built out of three individual commands,
1369   # which are joined by pipes; we check each individual command for
1370   # valid parameters
1371
1372   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1373                                export_script, instance.name,
1374                                real_os_dev.dev_path, logfile)
1375
1376   comprcmd = "gzip"
1377
1378   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1379                                 destdir, destdir, destfile)
1380   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1381                                                    constants.GANETI_RUNAS,
1382                                                    destcmd)
1383
1384   # all commands have been checked, so we're safe to combine them
1385   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1386
1387   result = utils.RunCmd(command)
1388
1389   if result.failed:
1390     logging.error("os snapshot export command '%s' returned error: %s"
1391                   " output: %s", command, result.fail_reason, result.output)
1392     return False
1393
1394   return True
1395
1396
1397 def FinalizeExport(instance, snap_disks):
1398   """Write out the export configuration information.
1399
1400   Args:
1401     instance: instance configuration
1402     snap_disks: snapshot block devices
1403
1404   Returns:
1405     False in case of error, True otherwise.
1406
1407   """
1408   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1409   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1410
1411   config = objects.SerializableConfigParser()
1412
1413   config.add_section(constants.INISECT_EXP)
1414   config.set(constants.INISECT_EXP, 'version', '0')
1415   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1416   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1417   config.set(constants.INISECT_EXP, 'os', instance.os)
1418   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1419
1420   config.add_section(constants.INISECT_INS)
1421   config.set(constants.INISECT_INS, 'name', instance.name)
1422   config.set(constants.INISECT_INS, 'memory', '%d' %
1423              instance.beparams[constants.BE_MEMORY])
1424   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1425              instance.beparams[constants.BE_VCPUS])
1426   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1427
1428   nic_count = 0
1429   for nic_count, nic in enumerate(instance.nics):
1430     config.set(constants.INISECT_INS, 'nic%d_mac' %
1431                nic_count, '%s' % nic.mac)
1432     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1433     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1434                '%s' % nic.bridge)
1435   # TODO: redundant: on load can read nics until it doesn't exist
1436   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1437
1438   disk_count = 0
1439   for disk_count, disk in enumerate(snap_disks):
1440     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1441                ('%s' % disk.iv_name))
1442     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1443                ('%s' % disk.physical_id[1]))
1444     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1445                ('%d' % disk.size))
1446   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1447
1448   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1449   cfo = open(cff, 'w')
1450   try:
1451     config.write(cfo)
1452   finally:
1453     cfo.close()
1454
1455   shutil.rmtree(finaldestdir, True)
1456   shutil.move(destdir, finaldestdir)
1457
1458   return True
1459
1460
1461 def ExportInfo(dest):
1462   """Get export configuration information.
1463
1464   Args:
1465     dest: directory containing the export
1466
1467   Returns:
1468     A serializable config file containing the export info.
1469
1470   """
1471   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1472
1473   config = objects.SerializableConfigParser()
1474   config.read(cff)
1475
1476   if (not config.has_section(constants.INISECT_EXP) or
1477       not config.has_section(constants.INISECT_INS)):
1478     return None
1479
1480   return config
1481
1482
1483 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1484                          cluster_name):
1485   """Import an os image into an instance.
1486
1487   Args:
1488     instance: the instance object
1489     os_disk: the instance-visible name of the os device
1490     swap_disk: the instance-visible name of the swap device
1491     src_node: node holding the source image
1492     src_image: path to the source image on src_node
1493
1494   Returns:
1495     False in case of error, True otherwise.
1496
1497   """
1498   inst_os = OSFromDisk(instance.os)
1499   import_script = inst_os.import_script
1500
1501   os_device = instance.FindDisk(os_disk)
1502   if os_device is None:
1503     logging.error("Can't find this device-visible name '%s'", os_disk)
1504     return False
1505
1506   swap_device = instance.FindDisk(swap_disk)
1507   if swap_device is None:
1508     logging.error("Can't find this device-visible name '%s'", swap_disk)
1509     return False
1510
1511   real_os_dev = _RecursiveFindBD(os_device)
1512   if real_os_dev is None:
1513     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1514                                   str(os_device))
1515   real_os_dev.Open()
1516
1517   real_swap_dev = _RecursiveFindBD(swap_device)
1518   if real_swap_dev is None:
1519     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1520                                   str(swap_device))
1521   real_swap_dev.Open()
1522
1523   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1524                                         instance.name, int(time.time()))
1525   if not os.path.exists(constants.LOG_OS_DIR):
1526     os.mkdir(constants.LOG_OS_DIR, 0750)
1527
1528   destcmd = utils.BuildShellCmd('cat %s', src_image)
1529   remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1530                                                    constants.GANETI_RUNAS,
1531                                                    destcmd)
1532
1533   comprcmd = "gunzip"
1534   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1535                                inst_os.path, import_script, instance.name,
1536                                real_os_dev.dev_path, real_swap_dev.dev_path,
1537                                logfile)
1538
1539   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1540   env = {'HYPERVISOR': instance.hypervisor}
1541
1542   result = utils.RunCmd(command, env=env)
1543
1544   if result.failed:
1545     logging.error("os import command '%s' returned error: %s"
1546                   " output: %s", command, result.fail_reason, result.output)
1547     return False
1548
1549   return True
1550
1551
1552 def ListExports():
1553   """Return a list of exports currently available on this machine.
1554
1555   """
1556   if os.path.isdir(constants.EXPORT_DIR):
1557     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1558   else:
1559     return []
1560
1561
1562 def RemoveExport(export):
1563   """Remove an existing export from the node.
1564
1565   Args:
1566     export: the name of the export to remove
1567
1568   Returns:
1569     False in case of error, True otherwise.
1570
1571   """
1572   target = os.path.join(constants.EXPORT_DIR, export)
1573
1574   shutil.rmtree(target)
1575   # TODO: catch some of the relevant exceptions and provide a pretty
1576   # error message if rmtree fails.
1577
1578   return True
1579
1580
1581 def RenameBlockDevices(devlist):
1582   """Rename a list of block devices.
1583
1584   The devlist argument is a list of tuples (disk, new_logical,
1585   new_physical). The return value will be a combined boolean result
1586   (True only if all renames succeeded).
1587
1588   """
1589   result = True
1590   for disk, unique_id in devlist:
1591     dev = _RecursiveFindBD(disk)
1592     if dev is None:
1593       result = False
1594       continue
1595     try:
1596       old_rpath = dev.dev_path
1597       dev.Rename(unique_id)
1598       new_rpath = dev.dev_path
1599       if old_rpath != new_rpath:
1600         DevCacheManager.RemoveCache(old_rpath)
1601         # FIXME: we should add the new cache information here, like:
1602         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1603         # but we don't have the owner here - maybe parse from existing
1604         # cache? for now, we only lose lvm data when we rename, which
1605         # is less critical than DRBD or MD
1606     except errors.BlockDeviceError, err:
1607       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1608       result = False
1609   return result
1610
1611
1612 def _TransformFileStorageDir(file_storage_dir):
1613   """Checks whether given file_storage_dir is valid.
1614
1615   Checks wheter the given file_storage_dir is within the cluster-wide
1616   default file_storage_dir stored in SimpleStore. Only paths under that
1617   directory are allowed.
1618
1619   Args:
1620     file_storage_dir: string with path
1621
1622   Returns:
1623     normalized file_storage_dir (string) if valid, None otherwise
1624
1625   """
1626   cfg = _GetConfig()
1627   file_storage_dir = os.path.normpath(file_storage_dir)
1628   base_file_storage_dir = cfg.GetFileStorageDir()
1629   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1630       base_file_storage_dir):
1631     logging.error("file storage directory '%s' is not under base file"
1632                   " storage directory '%s'",
1633                   file_storage_dir, base_file_storage_dir)
1634     return None
1635   return file_storage_dir
1636
1637
1638 def CreateFileStorageDir(file_storage_dir):
1639   """Create file storage directory.
1640
1641   Args:
1642     file_storage_dir: string containing the path
1643
1644   Returns:
1645     tuple with first element a boolean indicating wheter dir
1646     creation was successful or not
1647
1648   """
1649   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1650   result = True,
1651   if not file_storage_dir:
1652     result = False,
1653   else:
1654     if os.path.exists(file_storage_dir):
1655       if not os.path.isdir(file_storage_dir):
1656         logging.error("'%s' is not a directory", file_storage_dir)
1657         result = False,
1658     else:
1659       try:
1660         os.makedirs(file_storage_dir, 0750)
1661       except OSError, err:
1662         logging.error("Cannot create file storage directory '%s': %s",
1663                       file_storage_dir, err)
1664         result = False,
1665   return result
1666
1667
1668 def RemoveFileStorageDir(file_storage_dir):
1669   """Remove file storage directory.
1670
1671   Remove it only if it's empty. If not log an error and return.
1672
1673   Args:
1674     file_storage_dir: string containing the path
1675
1676   Returns:
1677     tuple with first element a boolean indicating wheter dir
1678     removal was successful or not
1679
1680   """
1681   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1682   result = True,
1683   if not file_storage_dir:
1684     result = False,
1685   else:
1686     if os.path.exists(file_storage_dir):
1687       if not os.path.isdir(file_storage_dir):
1688         logging.error("'%s' is not a directory", file_storage_dir)
1689         result = False,
1690       # deletes dir only if empty, otherwise we want to return False
1691       try:
1692         os.rmdir(file_storage_dir)
1693       except OSError, err:
1694         logging.exception("Cannot remove file storage directory '%s'",
1695                           file_storage_dir)
1696         result = False,
1697   return result
1698
1699
1700 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1701   """Rename the file storage directory.
1702
1703   Args:
1704     old_file_storage_dir: string containing the old path
1705     new_file_storage_dir: string containing the new path
1706
1707   Returns:
1708     tuple with first element a boolean indicating wheter dir
1709     rename was successful or not
1710
1711   """
1712   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1713   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1714   result = True,
1715   if not old_file_storage_dir or not new_file_storage_dir:
1716     result = False,
1717   else:
1718     if not os.path.exists(new_file_storage_dir):
1719       if os.path.isdir(old_file_storage_dir):
1720         try:
1721           os.rename(old_file_storage_dir, new_file_storage_dir)
1722         except OSError, err:
1723           logging.exception("Cannot rename '%s' to '%s'",
1724                             old_file_storage_dir, new_file_storage_dir)
1725           result =  False,
1726       else:
1727         logging.error("'%s' is not a directory", old_file_storage_dir)
1728         result = False,
1729     else:
1730       if os.path.exists(old_file_storage_dir):
1731         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1732                       old_file_storage_dir, new_file_storage_dir)
1733         result = False,
1734   return result
1735
1736
1737 def _IsJobQueueFile(file_name):
1738   """Checks whether the given filename is in the queue directory.
1739
1740   """
1741   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1742   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1743
1744   if not result:
1745     logging.error("'%s' is not a file in the queue directory",
1746                   file_name)
1747
1748   return result
1749
1750
1751 def JobQueueUpdate(file_name, content):
1752   """Updates a file in the queue directory.
1753
1754   """
1755   if not _IsJobQueueFile(file_name):
1756     return False
1757
1758   # Write and replace the file atomically
1759   utils.WriteFile(file_name, data=content)
1760
1761   return True
1762
1763
1764 def JobQueueRename(old, new):
1765   """Renames a job queue file.
1766
1767   """
1768   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1769     return False
1770
1771   os.rename(old, new)
1772
1773   return True
1774
1775
1776 def JobQueueSetDrainFlag(drain_flag):
1777   """Set the drain flag for the queue.
1778
1779   This will set or unset the queue drain flag.
1780
1781   @type drain_flag: bool
1782   @param drain_flag: if True, will set the drain flag, otherwise reset it.
1783
1784   """
1785   if drain_flag:
1786     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1787   else:
1788     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1789
1790   return True
1791
1792
1793 def CloseBlockDevices(disks):
1794   """Closes the given block devices.
1795
1796   This means they will be switched to secondary mode (in case of DRBD).
1797
1798   """
1799   bdevs = []
1800   for cf in disks:
1801     rd = _RecursiveFindBD(cf)
1802     if rd is None:
1803       return (False, "Can't find device %s" % cf)
1804     bdevs.append(rd)
1805
1806   msg = []
1807   for rd in bdevs:
1808     try:
1809       rd.Close()
1810     except errors.BlockDeviceError, err:
1811       msg.append(str(err))
1812   if msg:
1813     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1814   else:
1815     return (True, "All devices secondary")
1816
1817
1818 def ValidateHVParams(hvname, hvparams):
1819   """Validates the given hypervisor parameters.
1820
1821   @type hvname: string
1822   @param hvname: the hypervisor name
1823   @type hvparams: dict
1824   @param hvparams: the hypervisor parameters to be validated
1825   @rtype: tuple (bool, str)
1826   @return: tuple of (success, message)
1827
1828   """
1829   try:
1830     hv_type = hypervisor.GetHypervisor(hvname)
1831     hv_type.ValidateParameters(hvparams)
1832     return (True, "Validation passed")
1833   except errors.HypervisorError, err:
1834     return (False, str(err))
1835
1836
1837 class HooksRunner(object):
1838   """Hook runner.
1839
1840   This class is instantiated on the node side (ganeti-noded) and not on
1841   the master side.
1842
1843   """
1844   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1845
1846   def __init__(self, hooks_base_dir=None):
1847     """Constructor for hooks runner.
1848
1849     Args:
1850       - hooks_base_dir: if not None, this overrides the
1851         constants.HOOKS_BASE_DIR (useful for unittests)
1852
1853     """
1854     if hooks_base_dir is None:
1855       hooks_base_dir = constants.HOOKS_BASE_DIR
1856     self._BASE_DIR = hooks_base_dir
1857
1858   @staticmethod
1859   def ExecHook(script, env):
1860     """Exec one hook script.
1861
1862     Args:
1863      - script: the full path to the script
1864      - env: the environment with which to exec the script
1865
1866     """
1867     # exec the process using subprocess and log the output
1868     fdstdin = None
1869     try:
1870       fdstdin = open("/dev/null", "r")
1871       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1872                                stderr=subprocess.STDOUT, close_fds=True,
1873                                shell=False, cwd="/", env=env)
1874       output = ""
1875       try:
1876         output = child.stdout.read(4096)
1877         child.stdout.close()
1878       except EnvironmentError, err:
1879         output += "Hook script error: %s" % str(err)
1880
1881       while True:
1882         try:
1883           result = child.wait()
1884           break
1885         except EnvironmentError, err:
1886           if err.errno == errno.EINTR:
1887             continue
1888           raise
1889     finally:
1890       # try not to leak fds
1891       for fd in (fdstdin, ):
1892         if fd is not None:
1893           try:
1894             fd.close()
1895           except EnvironmentError, err:
1896             # just log the error
1897             #logging.exception("Error while closing fd %s", fd)
1898             pass
1899
1900     return result == 0, output
1901
1902   def RunHooks(self, hpath, phase, env):
1903     """Run the scripts in the hooks directory.
1904
1905     This method will not be usually overriden by child opcodes.
1906
1907     """
1908     if phase == constants.HOOKS_PHASE_PRE:
1909       suffix = "pre"
1910     elif phase == constants.HOOKS_PHASE_POST:
1911       suffix = "post"
1912     else:
1913       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1914     rr = []
1915
1916     subdir = "%s-%s.d" % (hpath, suffix)
1917     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1918     try:
1919       dir_contents = utils.ListVisibleFiles(dir_name)
1920     except OSError, err:
1921       # must log
1922       return rr
1923
1924     # we use the standard python sort order,
1925     # so 00name is the recommended naming scheme
1926     dir_contents.sort()
1927     for relname in dir_contents:
1928       fname = os.path.join(dir_name, relname)
1929       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1930           self.RE_MASK.match(relname) is not None):
1931         rrval = constants.HKR_SKIP
1932         output = ""
1933       else:
1934         result, output = self.ExecHook(fname, env)
1935         if not result:
1936           rrval = constants.HKR_FAIL
1937         else:
1938           rrval = constants.HKR_SUCCESS
1939       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1940
1941     return rr
1942
1943
1944 class IAllocatorRunner(object):
1945   """IAllocator runner.
1946
1947   This class is instantiated on the node side (ganeti-noded) and not on
1948   the master side.
1949
1950   """
1951   def Run(self, name, idata):
1952     """Run an iallocator script.
1953
1954     Return value: tuple of:
1955        - run status (one of the IARUN_ constants)
1956        - stdout
1957        - stderr
1958        - fail reason (as from utils.RunResult)
1959
1960     """
1961     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1962                                   os.path.isfile)
1963     if alloc_script is None:
1964       return (constants.IARUN_NOTFOUND, None, None, None)
1965
1966     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1967     try:
1968       os.write(fd, idata)
1969       os.close(fd)
1970       result = utils.RunCmd([alloc_script, fin_name])
1971       if result.failed:
1972         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1973                 result.fail_reason)
1974     finally:
1975       os.unlink(fin_name)
1976
1977     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1978
1979
1980 class DevCacheManager(object):
1981   """Simple class for managing a cache of block device information.
1982
1983   """
1984   _DEV_PREFIX = "/dev/"
1985   _ROOT_DIR = constants.BDEV_CACHE_DIR
1986
1987   @classmethod
1988   def _ConvertPath(cls, dev_path):
1989     """Converts a /dev/name path to the cache file name.
1990
1991     This replaces slashes with underscores and strips the /dev
1992     prefix. It then returns the full path to the cache file
1993
1994     """
1995     if dev_path.startswith(cls._DEV_PREFIX):
1996       dev_path = dev_path[len(cls._DEV_PREFIX):]
1997     dev_path = dev_path.replace("/", "_")
1998     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1999     return fpath
2000
2001   @classmethod
2002   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2003     """Updates the cache information for a given device.
2004
2005     """
2006     if dev_path is None:
2007       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2008       return
2009     fpath = cls._ConvertPath(dev_path)
2010     if on_primary:
2011       state = "primary"
2012     else:
2013       state = "secondary"
2014     if iv_name is None:
2015       iv_name = "not_visible"
2016     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2017     try:
2018       utils.WriteFile(fpath, data=fdata)
2019     except EnvironmentError, err:
2020       logging.exception("Can't update bdev cache for %s", dev_path)
2021
2022   @classmethod
2023   def RemoveCache(cls, dev_path):
2024     """Remove data for a dev_path.
2025
2026     """
2027     if dev_path is None:
2028       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2029       return
2030     fpath = cls._ConvertPath(dev_path)
2031     try:
2032       utils.RemoveFile(fpath)
2033     except EnvironmentError, err:
2034       logging.exception("Can't update bdev cache for %s", dev_path)