Export the hypervisor.ValidateParameters over RPC
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner(cluster_name):
52   return ssh.SshRunner(cluster_name)
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.OwnIpAddress(master_ip):
121       # we already have the ip:
122       logging.debug("Already started")
123     else:
124       logging.error("Someone else has the master ip, not activating")
125       ok = False
126   else:
127     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128                            "dev", master_netdev, "label",
129                            "%s:0" % master_netdev])
130     if result.failed:
131       logging.error("Can't activate master IP: %s", result.output)
132       ok = False
133
134     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135                            "-s", master_ip, master_ip])
136     # we'll ignore the exit code of arping
137
138   # and now start the master and rapi daemons
139   if start_daemons:
140     for daemon in 'ganeti-masterd', 'ganeti-rapi':
141       result = utils.RunCmd([daemon])
142       if result.failed:
143         logging.error("Can't start daemon %s: %s", daemon, result.output)
144         ok = False
145   return ok
146
147
148 def StopMaster(stop_daemons):
149   """Deactivate this node as master.
150
151   The function will always try to deactivate the IP address of the
152   master. Then, if the stop_daemons parameter is True, it will also
153   stop the master daemons (ganet-masterd and ganeti-rapi).
154
155   """
156   master_netdev, master_ip, _ = GetMasterInfo()
157   if not master_netdev:
158     return False
159
160   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161                          "dev", master_netdev])
162   if result.failed:
163     logging.error("Can't remove the master IP, error: %s", result.output)
164     # but otherwise ignore the failure
165
166   if stop_daemons:
167     # stop/kill the rapi and the master daemon
168     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170
171   return True
172
173
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175   """Joins this node to the cluster.
176
177   This does the following:
178       - updates the hostkeys of the machine (rsa and dsa)
179       - adds the ssh private key to the user
180       - adds the ssh public key to the users' authorized_keys file
181
182   """
183   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187   for name, content, mode in sshd_keys:
188     utils.WriteFile(name, data=content, mode=mode)
189
190   try:
191     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192                                                     mkdir=True)
193   except errors.OpExecError, err:
194     logging.exception("Error while processing user ssh files")
195     return False
196
197   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198     utils.WriteFile(name, data=content, mode=0600)
199
200   utils.AddAuthorizedKey(auth_keys, sshpub)
201
202   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203
204   return True
205
206
207 def LeaveCluster():
208   """Cleans up the current node and prepares it to be removed from the cluster.
209
210   """
211   _CleanDirectory(constants.DATA_DIR)
212   JobQueuePurge()
213
214   try:
215     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216   except errors.OpExecError:
217     logging.exception("Error while processing ssh files")
218     return
219
220   f = open(pub_key, 'r')
221   try:
222     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223   finally:
224     f.close()
225
226   utils.RemoveFile(priv_key)
227   utils.RemoveFile(pub_key)
228
229   # Return a reassuring string to the caller, and quit
230   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231
232
233 def GetNodeInfo(vgname, hypervisor_type):
234   """Gives back a hash with different informations about the node.
235
236   @type vgname: C{string}
237   @param vgname: the name of the volume group to ask for disk space information
238   @type hypervisor_type: C{str}
239   @param hypervisor_type: the name of the hypervisor to ask for
240       memory information
241   @rtype: C{dict}
242   @return: dictionary with the following keys:
243       - vg_size is the size of the configured volume group in MiB
244       - vg_free is the free size of the volume group in MiB
245       - memory_dom0 is the memory allocated for domain0 in MiB
246       - memory_free is the currently available (free) ram in MiB
247       - memory_total is the total number of ram in MiB
248
249   """
250   outputarray = {}
251   vginfo = _GetVGInfo(vgname)
252   outputarray['vg_size'] = vginfo['vg_size']
253   outputarray['vg_free'] = vginfo['vg_free']
254
255   hyper = hypervisor.GetHypervisor(hypervisor_type)
256   hyp_info = hyper.GetNodeInfo()
257   if hyp_info is not None:
258     outputarray.update(hyp_info)
259
260   f = open("/proc/sys/kernel/random/boot_id", 'r')
261   try:
262     outputarray["bootid"] = f.read(128).rstrip("\n")
263   finally:
264     f.close()
265
266   return outputarray
267
268
269 def VerifyNode(what, cluster_name):
270   """Verify the status of the local node.
271
272   Based on the input L{what} parameter, various checks are done on the
273   local node.
274
275   If the I{filelist} key is present, this list of
276   files is checksummed and the file/checksum pairs are returned.
277
278   If the I{nodelist} key is present, we check that we have
279   connectivity via ssh with the target nodes (and check the hostname
280   report).
281
282   If the I{node-net-test} key is present, we check that we have
283   connectivity to the given nodes via both primary IP and, if
284   applicable, secondary IPs.
285
286   @type what: C{dict}
287   @param what: a dictionary of things to check:
288       - filelist: list of files for which to compute checksums
289       - nodelist: list of nodes we should check ssh communication with
290       - node-net-test: list of nodes we should check node daemon port
291         connectivity with
292       - hypervisor: list with hypervisors to run the verify for
293
294   """
295   result = {}
296
297   if 'hypervisor' in what:
298     result['hypervisor'] = my_dict = {}
299     for hv_name in what['hypervisor']:
300       my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301
302   if 'filelist' in what:
303     result['filelist'] = utils.FingerprintFiles(what['filelist'])
304
305   if 'nodelist' in what:
306     result['nodelist'] = {}
307     random.shuffle(what['nodelist'])
308     for node in what['nodelist']:
309       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310       if not success:
311         result['nodelist'][node] = message
312   if 'node-net-test' in what:
313     result['node-net-test'] = {}
314     my_name = utils.HostInfo().name
315     my_pip = my_sip = None
316     for name, pip, sip in what['node-net-test']:
317       if name == my_name:
318         my_pip = pip
319         my_sip = sip
320         break
321     if not my_pip:
322       result['node-net-test'][my_name] = ("Can't find my own"
323                                           " primary/secondary IP"
324                                           " in the node list")
325     else:
326       port = utils.GetNodeDaemonPort()
327       for name, pip, sip in what['node-net-test']:
328         fail = []
329         if not utils.TcpPing(pip, port, source=my_pip):
330           fail.append("primary")
331         if sip != pip:
332           if not utils.TcpPing(sip, port, source=my_sip):
333             fail.append("secondary")
334         if fail:
335           result['node-net-test'][name] = ("failure using the %s"
336                                            " interface(s)" %
337                                            " and ".join(fail))
338
339   return result
340
341
342 def GetVolumeList(vg_name):
343   """Compute list of logical volumes and their size.
344
345   Returns:
346     dictionary of all partions (key) with their size (in MiB), inactive
347     and online status:
348     {'test1': ('20.06', True, True)}
349
350   """
351   lvs = {}
352   sep = '|'
353   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354                          "--separator=%s" % sep,
355                          "-olv_name,lv_size,lv_attr", vg_name])
356   if result.failed:
357     logging.error("Failed to list logical volumes, lvs output: %s",
358                   result.output)
359     return result.output
360
361   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362   for line in result.stdout.splitlines():
363     line = line.strip()
364     match = valid_line_re.match(line)
365     if not match:
366       logging.error("Invalid line returned from lvs output: '%s'", line)
367       continue
368     name, size, attr = match.groups()
369     inactive = attr[4] == '-'
370     online = attr[5] == 'o'
371     lvs[name] = (size, inactive, online)
372
373   return lvs
374
375
376 def ListVolumeGroups():
377   """List the volume groups and their size.
378
379   Returns:
380     Dictionary with keys volume name and values the size of the volume
381
382   """
383   return utils.ListVolumeGroups()
384
385
386 def NodeVolumes():
387   """List all volumes on this node.
388
389   """
390   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391                          "--separator=|",
392                          "--options=lv_name,lv_size,devices,vg_name"])
393   if result.failed:
394     logging.error("Failed to list logical volumes, lvs output: %s",
395                   result.output)
396     return {}
397
398   def parse_dev(dev):
399     if '(' in dev:
400       return dev.split('(')[0]
401     else:
402       return dev
403
404   def map_line(line):
405     return {
406       'name': line[0].strip(),
407       'size': line[1].strip(),
408       'dev': parse_dev(line[2].strip()),
409       'vg': line[3].strip(),
410     }
411
412   return [map_line(line.split('|')) for line in result.stdout.splitlines()
413           if line.count('|') >= 3]
414
415
416 def BridgesExist(bridges_list):
417   """Check if a list of bridges exist on the current node.
418
419   Returns:
420     True if all of them exist, false otherwise
421
422   """
423   for bridge in bridges_list:
424     if not utils.BridgeExists(bridge):
425       return False
426
427   return True
428
429
430 def GetInstanceList(hypervisor_list):
431   """Provides a list of instances.
432
433   @type hypervisor_list: list
434   @param hypervisor_list: the list of hypervisors to query information
435
436   @rtype: list
437   @return: a list of all running instances on the current node
438              - instance1.example.com
439              - instance2.example.com
440
441   """
442   results = []
443   for hname in hypervisor_list:
444     try:
445       names = hypervisor.GetHypervisor(hname).ListInstances()
446       results.extend(names)
447     except errors.HypervisorError, err:
448       logging.exception("Error enumerating instances for hypevisor %s", hname)
449       # FIXME: should we somehow not propagate this to the master?
450       raise
451
452   return results
453
454
455 def GetInstanceInfo(instance, hname):
456   """Gives back the informations about an instance as a dictionary.
457
458   @type instance: string
459   @param instance: the instance name
460   @type hname: string
461   @param hname: the hypervisor type of the instance
462
463   @rtype: dict
464   @return: dictionary with the following keys:
465       - memory: memory size of instance (int)
466       - state: xen state of instance (string)
467       - time: cpu time of instance (float)
468
469   """
470   output = {}
471
472   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473   if iinfo is not None:
474     output['memory'] = iinfo[2]
475     output['state'] = iinfo[4]
476     output['time'] = iinfo[5]
477
478   return output
479
480
481 def GetAllInstancesInfo(hypervisor_list):
482   """Gather data about all instances.
483
484   This is the equivalent of `GetInstanceInfo()`, except that it
485   computes data for all instances at once, thus being faster if one
486   needs data about more than one instance.
487
488   @type hypervisor_list: list
489   @param hypervisor_list: list of hypervisors to query for instance data
490
491   @rtype: dict of dicts
492   @return: dictionary of instance: data, with data having the following keys:
493       - memory: memory size of instance (int)
494       - state: xen state of instance (string)
495       - time: cpu time of instance (float)
496       - vcpuus: the number of vcpus
497
498   """
499   output = {}
500
501   for hname in hypervisor_list:
502     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503     if iinfo:
504       for name, inst_id, memory, vcpus, state, times in iinfo:
505         if name in output:
506           raise errors.HypervisorError("Instance %s running duplicate" % name)
507         output[name] = {
508           'memory': memory,
509           'vcpus': vcpus,
510           'state': state,
511           'time': times,
512           }
513
514   return output
515
516
517 def AddOSToInstance(instance, os_disk, swap_disk):
518   """Add an OS to an instance.
519
520   Args:
521     instance: the instance object
522     os_disk: the instance-visible name of the os device
523     swap_disk: the instance-visible name of the swap device
524
525   """
526   inst_os = OSFromDisk(instance.os)
527
528   create_script = inst_os.create_script
529
530   os_device = instance.FindDisk(os_disk)
531   if os_device is None:
532     logging.error("Can't find this device-visible name '%s'", os_disk)
533     return False
534
535   swap_device = instance.FindDisk(swap_disk)
536   if swap_device is None:
537     logging.error("Can't find this device-visible name '%s'", swap_disk)
538     return False
539
540   real_os_dev = _RecursiveFindBD(os_device)
541   if real_os_dev is None:
542     raise errors.BlockDeviceError("Block device '%s' is not set up" %
543                                   str(os_device))
544   real_os_dev.Open()
545
546   real_swap_dev = _RecursiveFindBD(swap_device)
547   if real_swap_dev is None:
548     raise errors.BlockDeviceError("Block device '%s' is not set up" %
549                                   str(swap_device))
550   real_swap_dev.Open()
551
552   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
553                                      instance.name, int(time.time()))
554   if not os.path.exists(constants.LOG_OS_DIR):
555     os.mkdir(constants.LOG_OS_DIR, 0750)
556
557   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
558                                 inst_os.path, create_script, instance.name,
559                                 real_os_dev.dev_path, real_swap_dev.dev_path,
560                                 logfile)
561   env = {'HYPERVISOR': instance.hypervisor}
562
563   result = utils.RunCmd(command, env=env)
564   if result.failed:
565     logging.error("os create command '%s' returned error: %s, logfile: %s,"
566                   " output: %s", command, result.fail_reason, logfile,
567                   result.output)
568     return False
569
570   return True
571
572
573 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
574   """Run the OS rename script for an instance.
575
576   Args:
577     instance: the instance object
578     old_name: the old name of the instance
579     os_disk: the instance-visible name of the os device
580     swap_disk: the instance-visible name of the swap device
581
582   """
583   inst_os = OSFromDisk(instance.os)
584
585   script = inst_os.rename_script
586
587   os_device = instance.FindDisk(os_disk)
588   if os_device is None:
589     logging.error("Can't find this device-visible name '%s'", os_disk)
590     return False
591
592   swap_device = instance.FindDisk(swap_disk)
593   if swap_device is None:
594     logging.error("Can't find this device-visible name '%s'", swap_disk)
595     return False
596
597   real_os_dev = _RecursiveFindBD(os_device)
598   if real_os_dev is None:
599     raise errors.BlockDeviceError("Block device '%s' is not set up" %
600                                   str(os_device))
601   real_os_dev.Open()
602
603   real_swap_dev = _RecursiveFindBD(swap_device)
604   if real_swap_dev is None:
605     raise errors.BlockDeviceError("Block device '%s' is not set up" %
606                                   str(swap_device))
607   real_swap_dev.Open()
608
609   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
610                                            old_name,
611                                            instance.name, int(time.time()))
612   if not os.path.exists(constants.LOG_OS_DIR):
613     os.mkdir(constants.LOG_OS_DIR, 0750)
614
615   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
616                                 inst_os.path, script, old_name, instance.name,
617                                 real_os_dev.dev_path, real_swap_dev.dev_path,
618                                 logfile)
619
620   result = utils.RunCmd(command)
621
622   if result.failed:
623     logging.error("os create command '%s' returned error: %s output: %s",
624                   command, result.fail_reason, result.output)
625     return False
626
627   return True
628
629
630 def _GetVGInfo(vg_name):
631   """Get informations about the volume group.
632
633   Args:
634     vg_name: the volume group
635
636   Returns:
637     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
638     where
639     vg_size is the total size of the volume group in MiB
640     vg_free is the free size of the volume group in MiB
641     pv_count are the number of physical disks in that vg
642
643   If an error occurs during gathering of data, we return the same dict
644   with keys all set to None.
645
646   """
647   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
648
649   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
650                          "--nosuffix", "--units=m", "--separator=:", vg_name])
651
652   if retval.failed:
653     logging.error("volume group %s not present", vg_name)
654     return retdic
655   valarr = retval.stdout.strip().rstrip(':').split(':')
656   if len(valarr) == 3:
657     try:
658       retdic = {
659         "vg_size": int(round(float(valarr[0]), 0)),
660         "vg_free": int(round(float(valarr[1]), 0)),
661         "pv_count": int(valarr[2]),
662         }
663     except ValueError, err:
664       logging.exception("Fail to parse vgs output")
665   else:
666     logging.error("vgs output has the wrong number of fields (expected"
667                   " three): %s", str(valarr))
668   return retdic
669
670
671 def _GatherBlockDevs(instance):
672   """Set up an instance's block device(s).
673
674   This is run on the primary node at instance startup. The block
675   devices must be already assembled.
676
677   """
678   block_devices = []
679   for disk in instance.disks:
680     device = _RecursiveFindBD(disk)
681     if device is None:
682       raise errors.BlockDeviceError("Block device '%s' is not set up." %
683                                     str(disk))
684     device.Open()
685     block_devices.append((disk, device))
686   return block_devices
687
688
689 def StartInstance(instance, extra_args):
690   """Start an instance.
691
692   @type instance: instance object
693   @param instance: the instance object
694   @rtype: boolean
695   @return: whether the startup was successful or not
696
697   """
698   running_instances = GetInstanceList([instance.hypervisor])
699
700   if instance.name in running_instances:
701     return True
702
703   block_devices = _GatherBlockDevs(instance)
704   hyper = hypervisor.GetHypervisor(instance.hypervisor)
705
706   try:
707     hyper.StartInstance(instance, block_devices, extra_args)
708   except errors.HypervisorError, err:
709     logging.exception("Failed to start instance")
710     return False
711
712   return True
713
714
715 def ShutdownInstance(instance):
716   """Shut an instance down.
717
718   @type instance: instance object
719   @param instance: the instance object
720   @rtype: boolean
721   @return: whether the startup was successful or not
722
723   """
724   hv_name = instance.hypervisor
725   running_instances = GetInstanceList([hv_name])
726
727   if instance.name not in running_instances:
728     return True
729
730   hyper = hypervisor.GetHypervisor(hv_name)
731   try:
732     hyper.StopInstance(instance)
733   except errors.HypervisorError, err:
734     logging.error("Failed to stop instance")
735     return False
736
737   # test every 10secs for 2min
738   shutdown_ok = False
739
740   time.sleep(1)
741   for dummy in range(11):
742     if instance.name not in GetInstanceList([hv_name]):
743       break
744     time.sleep(10)
745   else:
746     # the shutdown did not succeed
747     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
748
749     try:
750       hyper.StopInstance(instance, force=True)
751     except errors.HypervisorError, err:
752       logging.exception("Failed to stop instance")
753       return False
754
755     time.sleep(1)
756     if instance.name in GetInstanceList([hv_name]):
757       logging.error("could not shutdown instance '%s' even by destroy",
758                     instance.name)
759       return False
760
761   return True
762
763
764 def RebootInstance(instance, reboot_type, extra_args):
765   """Reboot an instance.
766
767   Args:
768     instance    - name of instance to reboot
769     reboot_type - how to reboot [soft,hard,full]
770
771   """
772   running_instances = GetInstanceList([instance.hypervisor])
773
774   if instance.name not in running_instances:
775     logging.error("Cannot reboot instance that is not running")
776     return False
777
778   hyper = hypervisor.GetHypervisor(instance.hypervisor)
779   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
780     try:
781       hyper.RebootInstance(instance)
782     except errors.HypervisorError, err:
783       logging.exception("Failed to soft reboot instance")
784       return False
785   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
786     try:
787       ShutdownInstance(instance)
788       StartInstance(instance, extra_args)
789     except errors.HypervisorError, err:
790       logging.exception("Failed to hard reboot instance")
791       return False
792   else:
793     raise errors.ParameterError("reboot_type invalid")
794
795   return True
796
797
798 def MigrateInstance(instance, target, live):
799   """Migrates an instance to another node.
800
801   @type instance: C{objects.Instance}
802   @param instance: the instance definition
803   @type target: string
804   @param target: the target node name
805   @type live: boolean
806   @param live: whether the migration should be done live or not (the
807       interpretation of this parameter is left to the hypervisor)
808   @rtype: tuple
809   @return: a tuple of (success, msg) where:
810       - succes is a boolean denoting the success/failure of the operation
811       - msg is a string with details in case of failure
812
813   """
814   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
815
816   try:
817     hyper.MigrateInstance(instance.name, target, live)
818   except errors.HypervisorError, err:
819     msg = "Failed to migrate instance: %s" % str(err)
820     logging.error(msg)
821     return (False, msg)
822   return (True, "Migration successfull")
823
824
825 def CreateBlockDevice(disk, size, owner, on_primary, info):
826   """Creates a block device for an instance.
827
828   Args:
829    disk: a ganeti.objects.Disk object
830    size: the size of the physical underlying device
831    owner: a string with the name of the instance
832    on_primary: a boolean indicating if it is the primary node or not
833    info: string that will be sent to the physical device creation
834
835   Returns:
836     the new unique_id of the device (this can sometime be
837     computed only after creation), or None. On secondary nodes,
838     it's not required to return anything.
839
840   """
841   clist = []
842   if disk.children:
843     for child in disk.children:
844       crdev = _RecursiveAssembleBD(child, owner, on_primary)
845       if on_primary or disk.AssembleOnSecondary():
846         # we need the children open in case the device itself has to
847         # be assembled
848         crdev.Open()
849       clist.append(crdev)
850   try:
851     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
852     if device is not None:
853       logging.info("removing existing device %s", disk)
854       device.Remove()
855   except errors.BlockDeviceError, err:
856     pass
857
858   device = bdev.Create(disk.dev_type, disk.physical_id,
859                        clist, size)
860   if device is None:
861     raise ValueError("Can't create child device for %s, %s" %
862                      (disk, size))
863   if on_primary or disk.AssembleOnSecondary():
864     if not device.Assemble():
865       errorstring = "Can't assemble device after creation"
866       logging.error(errorstring)
867       raise errors.BlockDeviceError("%s, very unusual event - check the node"
868                                     " daemon logs" % errorstring)
869     device.SetSyncSpeed(constants.SYNC_SPEED)
870     if on_primary or disk.OpenOnSecondary():
871       device.Open(force=True)
872     DevCacheManager.UpdateCache(device.dev_path, owner,
873                                 on_primary, disk.iv_name)
874
875   device.SetInfo(info)
876
877   physical_id = device.unique_id
878   return physical_id
879
880
881 def RemoveBlockDevice(disk):
882   """Remove a block device.
883
884   This is intended to be called recursively.
885
886   """
887   try:
888     # since we are removing the device, allow a partial match
889     # this allows removal of broken mirrors
890     rdev = _RecursiveFindBD(disk, allow_partial=True)
891   except errors.BlockDeviceError, err:
892     # probably can't attach
893     logging.info("Can't attach to device %s in remove", disk)
894     rdev = None
895   if rdev is not None:
896     r_path = rdev.dev_path
897     result = rdev.Remove()
898     if result:
899       DevCacheManager.RemoveCache(r_path)
900   else:
901     result = True
902   if disk.children:
903     for child in disk.children:
904       result = result and RemoveBlockDevice(child)
905   return result
906
907
908 def _RecursiveAssembleBD(disk, owner, as_primary):
909   """Activate a block device for an instance.
910
911   This is run on the primary and secondary nodes for an instance.
912
913   This function is called recursively.
914
915   Args:
916     disk: a objects.Disk object
917     as_primary: if we should make the block device read/write
918
919   Returns:
920     the assembled device or None (in case no device was assembled)
921
922   If the assembly is not successful, an exception is raised.
923
924   """
925   children = []
926   if disk.children:
927     mcn = disk.ChildrenNeeded()
928     if mcn == -1:
929       mcn = 0 # max number of Nones allowed
930     else:
931       mcn = len(disk.children) - mcn # max number of Nones
932     for chld_disk in disk.children:
933       try:
934         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
935       except errors.BlockDeviceError, err:
936         if children.count(None) >= mcn:
937           raise
938         cdev = None
939         logging.debug("Error in child activation: %s", str(err))
940       children.append(cdev)
941
942   if as_primary or disk.AssembleOnSecondary():
943     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
944     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
945     result = r_dev
946     if as_primary or disk.OpenOnSecondary():
947       r_dev.Open()
948     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
949                                 as_primary, disk.iv_name)
950
951   else:
952     result = True
953   return result
954
955
956 def AssembleBlockDevice(disk, owner, as_primary):
957   """Activate a block device for an instance.
958
959   This is a wrapper over _RecursiveAssembleBD.
960
961   Returns:
962     a /dev path for primary nodes
963     True for secondary nodes
964
965   """
966   result = _RecursiveAssembleBD(disk, owner, as_primary)
967   if isinstance(result, bdev.BlockDev):
968     result = result.dev_path
969   return result
970
971
972 def ShutdownBlockDevice(disk):
973   """Shut down a block device.
974
975   First, if the device is assembled (can `Attach()`), then the device
976   is shutdown. Then the children of the device are shutdown.
977
978   This function is called recursively. Note that we don't cache the
979   children or such, as oppossed to assemble, shutdown of different
980   devices doesn't require that the upper device was active.
981
982   """
983   r_dev = _RecursiveFindBD(disk)
984   if r_dev is not None:
985     r_path = r_dev.dev_path
986     result = r_dev.Shutdown()
987     if result:
988       DevCacheManager.RemoveCache(r_path)
989   else:
990     result = True
991   if disk.children:
992     for child in disk.children:
993       result = result and ShutdownBlockDevice(child)
994   return result
995
996
997 def MirrorAddChildren(parent_cdev, new_cdevs):
998   """Extend a mirrored block device.
999
1000   """
1001   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1002   if parent_bdev is None:
1003     logging.error("Can't find parent device")
1004     return False
1005   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1006   if new_bdevs.count(None) > 0:
1007     logging.error("Can't find new device(s) to add: %s:%s",
1008                   new_bdevs, new_cdevs)
1009     return False
1010   parent_bdev.AddChildren(new_bdevs)
1011   return True
1012
1013
1014 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1015   """Shrink a mirrored block device.
1016
1017   """
1018   parent_bdev = _RecursiveFindBD(parent_cdev)
1019   if parent_bdev is None:
1020     logging.error("Can't find parent in remove children: %s", parent_cdev)
1021     return False
1022   devs = []
1023   for disk in new_cdevs:
1024     rpath = disk.StaticDevPath()
1025     if rpath is None:
1026       bd = _RecursiveFindBD(disk)
1027       if bd is None:
1028         logging.error("Can't find dynamic device %s while removing children",
1029                       disk)
1030         return False
1031       else:
1032         devs.append(bd.dev_path)
1033     else:
1034       devs.append(rpath)
1035   parent_bdev.RemoveChildren(devs)
1036   return True
1037
1038
1039 def GetMirrorStatus(disks):
1040   """Get the mirroring status of a list of devices.
1041
1042   Args:
1043     disks: list of `objects.Disk`
1044
1045   Returns:
1046     list of (mirror_done, estimated_time) tuples, which
1047     are the result of bdev.BlockDevice.CombinedSyncStatus()
1048
1049   """
1050   stats = []
1051   for dsk in disks:
1052     rbd = _RecursiveFindBD(dsk)
1053     if rbd is None:
1054       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1055     stats.append(rbd.CombinedSyncStatus())
1056   return stats
1057
1058
1059 def _RecursiveFindBD(disk, allow_partial=False):
1060   """Check if a device is activated.
1061
1062   If so, return informations about the real device.
1063
1064   Args:
1065     disk: the objects.Disk instance
1066     allow_partial: don't abort the find if a child of the
1067                    device can't be found; this is intended to be
1068                    used when repairing mirrors
1069
1070   Returns:
1071     None if the device can't be found
1072     otherwise the device instance
1073
1074   """
1075   children = []
1076   if disk.children:
1077     for chdisk in disk.children:
1078       children.append(_RecursiveFindBD(chdisk))
1079
1080   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1081
1082
1083 def FindBlockDevice(disk):
1084   """Check if a device is activated.
1085
1086   If so, return informations about the real device.
1087
1088   Args:
1089     disk: the objects.Disk instance
1090   Returns:
1091     None if the device can't be found
1092     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1093
1094   """
1095   rbd = _RecursiveFindBD(disk)
1096   if rbd is None:
1097     return rbd
1098   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1099
1100
1101 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1102   """Write a file to the filesystem.
1103
1104   This allows the master to overwrite(!) a file. It will only perform
1105   the operation if the file belongs to a list of configuration files.
1106
1107   """
1108   if not os.path.isabs(file_name):
1109     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1110                   file_name)
1111     return False
1112
1113   allowed_files = [
1114     constants.CLUSTER_CONF_FILE,
1115     constants.ETC_HOSTS,
1116     constants.SSH_KNOWN_HOSTS_FILE,
1117     constants.VNC_PASSWORD_FILE,
1118     ]
1119
1120   if file_name not in allowed_files:
1121     logging.error("Filename passed to UploadFile not in allowed"
1122                  " upload targets: '%s'", file_name)
1123     return False
1124
1125   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1126                   atime=atime, mtime=mtime)
1127   return True
1128
1129
1130 def _ErrnoOrStr(err):
1131   """Format an EnvironmentError exception.
1132
1133   If the `err` argument has an errno attribute, it will be looked up
1134   and converted into a textual EXXXX description. Otherwise the string
1135   representation of the error will be returned.
1136
1137   """
1138   if hasattr(err, 'errno'):
1139     detail = errno.errorcode[err.errno]
1140   else:
1141     detail = str(err)
1142   return detail
1143
1144
1145 def _OSOndiskVersion(name, os_dir):
1146   """Compute and return the API version of a given OS.
1147
1148   This function will try to read the API version of the os given by
1149   the 'name' parameter and residing in the 'os_dir' directory.
1150
1151   Return value will be either an integer denoting the version or None in the
1152   case when this is not a valid OS name.
1153
1154   """
1155   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1156
1157   try:
1158     st = os.stat(api_file)
1159   except EnvironmentError, err:
1160     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1161                            " found (%s)" % _ErrnoOrStr(err))
1162
1163   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1164     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1165                            " a regular file")
1166
1167   try:
1168     f = open(api_file)
1169     try:
1170       api_versions = f.readlines()
1171     finally:
1172       f.close()
1173   except EnvironmentError, err:
1174     raise errors.InvalidOS(name, os_dir, "error while reading the"
1175                            " API version (%s)" % _ErrnoOrStr(err))
1176
1177   api_versions = [version.strip() for version in api_versions]
1178   try:
1179     api_versions = [int(version) for version in api_versions]
1180   except (TypeError, ValueError), err:
1181     raise errors.InvalidOS(name, os_dir,
1182                            "API version is not integer (%s)" % str(err))
1183
1184   return api_versions
1185
1186
1187 def DiagnoseOS(top_dirs=None):
1188   """Compute the validity for all OSes.
1189
1190   Returns an OS object for each name in all the given top directories
1191   (if not given defaults to constants.OS_SEARCH_PATH)
1192
1193   Returns:
1194     list of OS objects
1195
1196   """
1197   if top_dirs is None:
1198     top_dirs = constants.OS_SEARCH_PATH
1199
1200   result = []
1201   for dir_name in top_dirs:
1202     if os.path.isdir(dir_name):
1203       try:
1204         f_names = utils.ListVisibleFiles(dir_name)
1205       except EnvironmentError, err:
1206         logging.exception("Can't list the OS directory %s", dir_name)
1207         break
1208       for name in f_names:
1209         try:
1210           os_inst = OSFromDisk(name, base_dir=dir_name)
1211           result.append(os_inst)
1212         except errors.InvalidOS, err:
1213           result.append(objects.OS.FromInvalidOS(err))
1214
1215   return result
1216
1217
1218 def OSFromDisk(name, base_dir=None):
1219   """Create an OS instance from disk.
1220
1221   This function will return an OS instance if the given name is a
1222   valid OS name. Otherwise, it will raise an appropriate
1223   `errors.InvalidOS` exception, detailing why this is not a valid
1224   OS.
1225
1226   Args:
1227     os_dir: Directory containing the OS scripts. Defaults to a search
1228             in all the OS_SEARCH_PATH directories.
1229
1230   """
1231
1232   if base_dir is None:
1233     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1234     if os_dir is None:
1235       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1236   else:
1237     os_dir = os.path.sep.join([base_dir, name])
1238
1239   api_versions = _OSOndiskVersion(name, os_dir)
1240
1241   if constants.OS_API_VERSION not in api_versions:
1242     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1243                            " (found %s want %s)"
1244                            % (api_versions, constants.OS_API_VERSION))
1245
1246   # OS Scripts dictionary, we will populate it with the actual script names
1247   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1248
1249   for script in os_scripts:
1250     os_scripts[script] = os.path.sep.join([os_dir, script])
1251
1252     try:
1253       st = os.stat(os_scripts[script])
1254     except EnvironmentError, err:
1255       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1256                              (script, _ErrnoOrStr(err)))
1257
1258     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1259       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1260                              script)
1261
1262     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1263       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1264                              script)
1265
1266
1267   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1268                     create_script=os_scripts['create'],
1269                     export_script=os_scripts['export'],
1270                     import_script=os_scripts['import'],
1271                     rename_script=os_scripts['rename'],
1272                     api_versions=api_versions)
1273
1274
1275 def GrowBlockDevice(disk, amount):
1276   """Grow a stack of block devices.
1277
1278   This function is called recursively, with the childrens being the
1279   first one resize.
1280
1281   Args:
1282     disk: the disk to be grown
1283
1284   Returns: a tuple of (status, result), with:
1285     status: the result (true/false) of the operation
1286     result: the error message if the operation failed, otherwise not used
1287
1288   """
1289   r_dev = _RecursiveFindBD(disk)
1290   if r_dev is None:
1291     return False, "Cannot find block device %s" % (disk,)
1292
1293   try:
1294     r_dev.Grow(amount)
1295   except errors.BlockDeviceError, err:
1296     return False, str(err)
1297
1298   return True, None
1299
1300
1301 def SnapshotBlockDevice(disk):
1302   """Create a snapshot copy of a block device.
1303
1304   This function is called recursively, and the snapshot is actually created
1305   just for the leaf lvm backend device.
1306
1307   Args:
1308     disk: the disk to be snapshotted
1309
1310   Returns:
1311     a config entry for the actual lvm device snapshotted.
1312
1313   """
1314   if disk.children:
1315     if len(disk.children) == 1:
1316       # only one child, let's recurse on it
1317       return SnapshotBlockDevice(disk.children[0])
1318     else:
1319       # more than one child, choose one that matches
1320       for child in disk.children:
1321         if child.size == disk.size:
1322           # return implies breaking the loop
1323           return SnapshotBlockDevice(child)
1324   elif disk.dev_type == constants.LD_LV:
1325     r_dev = _RecursiveFindBD(disk)
1326     if r_dev is not None:
1327       # let's stay on the safe side and ask for the full size, for now
1328       return r_dev.Snapshot(disk.size)
1329     else:
1330       return None
1331   else:
1332     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1333                                  " '%s' of type '%s'" %
1334                                  (disk.unique_id, disk.dev_type))
1335
1336
1337 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1338   """Export a block device snapshot to a remote node.
1339
1340   Args:
1341     disk: the snapshot block device
1342     dest_node: the node to send the image to
1343     instance: instance being exported
1344
1345   Returns:
1346     True if successful, False otherwise.
1347
1348   """
1349   inst_os = OSFromDisk(instance.os)
1350   export_script = inst_os.export_script
1351
1352   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1353                                      instance.name, int(time.time()))
1354   if not os.path.exists(constants.LOG_OS_DIR):
1355     os.mkdir(constants.LOG_OS_DIR, 0750)
1356
1357   real_os_dev = _RecursiveFindBD(disk)
1358   if real_os_dev is None:
1359     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1360                                   str(disk))
1361   real_os_dev.Open()
1362
1363   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1364   destfile = disk.physical_id[1]
1365
1366   # the target command is built out of three individual commands,
1367   # which are joined by pipes; we check each individual command for
1368   # valid parameters
1369
1370   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1371                                export_script, instance.name,
1372                                real_os_dev.dev_path, logfile)
1373
1374   comprcmd = "gzip"
1375
1376   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1377                                 destdir, destdir, destfile)
1378   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1379                                                    constants.GANETI_RUNAS,
1380                                                    destcmd)
1381
1382   # all commands have been checked, so we're safe to combine them
1383   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1384
1385   result = utils.RunCmd(command)
1386
1387   if result.failed:
1388     logging.error("os snapshot export command '%s' returned error: %s"
1389                   " output: %s", command, result.fail_reason, result.output)
1390     return False
1391
1392   return True
1393
1394
1395 def FinalizeExport(instance, snap_disks):
1396   """Write out the export configuration information.
1397
1398   Args:
1399     instance: instance configuration
1400     snap_disks: snapshot block devices
1401
1402   Returns:
1403     False in case of error, True otherwise.
1404
1405   """
1406   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1407   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1408
1409   config = objects.SerializableConfigParser()
1410
1411   config.add_section(constants.INISECT_EXP)
1412   config.set(constants.INISECT_EXP, 'version', '0')
1413   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1414   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1415   config.set(constants.INISECT_EXP, 'os', instance.os)
1416   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1417
1418   config.add_section(constants.INISECT_INS)
1419   config.set(constants.INISECT_INS, 'name', instance.name)
1420   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1421   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1422   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1423
1424   nic_count = 0
1425   for nic_count, nic in enumerate(instance.nics):
1426     config.set(constants.INISECT_INS, 'nic%d_mac' %
1427                nic_count, '%s' % nic.mac)
1428     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1429     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1430                '%s' % nic.bridge)
1431   # TODO: redundant: on load can read nics until it doesn't exist
1432   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1433
1434   disk_count = 0
1435   for disk_count, disk in enumerate(snap_disks):
1436     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1437                ('%s' % disk.iv_name))
1438     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1439                ('%s' % disk.physical_id[1]))
1440     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1441                ('%d' % disk.size))
1442   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1443
1444   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1445   cfo = open(cff, 'w')
1446   try:
1447     config.write(cfo)
1448   finally:
1449     cfo.close()
1450
1451   shutil.rmtree(finaldestdir, True)
1452   shutil.move(destdir, finaldestdir)
1453
1454   return True
1455
1456
1457 def ExportInfo(dest):
1458   """Get export configuration information.
1459
1460   Args:
1461     dest: directory containing the export
1462
1463   Returns:
1464     A serializable config file containing the export info.
1465
1466   """
1467   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1468
1469   config = objects.SerializableConfigParser()
1470   config.read(cff)
1471
1472   if (not config.has_section(constants.INISECT_EXP) or
1473       not config.has_section(constants.INISECT_INS)):
1474     return None
1475
1476   return config
1477
1478
1479 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1480                          cluster_name):
1481   """Import an os image into an instance.
1482
1483   Args:
1484     instance: the instance object
1485     os_disk: the instance-visible name of the os device
1486     swap_disk: the instance-visible name of the swap device
1487     src_node: node holding the source image
1488     src_image: path to the source image on src_node
1489
1490   Returns:
1491     False in case of error, True otherwise.
1492
1493   """
1494   inst_os = OSFromDisk(instance.os)
1495   import_script = inst_os.import_script
1496
1497   os_device = instance.FindDisk(os_disk)
1498   if os_device is None:
1499     logging.error("Can't find this device-visible name '%s'", os_disk)
1500     return False
1501
1502   swap_device = instance.FindDisk(swap_disk)
1503   if swap_device is None:
1504     logging.error("Can't find this device-visible name '%s'", swap_disk)
1505     return False
1506
1507   real_os_dev = _RecursiveFindBD(os_device)
1508   if real_os_dev is None:
1509     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1510                                   str(os_device))
1511   real_os_dev.Open()
1512
1513   real_swap_dev = _RecursiveFindBD(swap_device)
1514   if real_swap_dev is None:
1515     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1516                                   str(swap_device))
1517   real_swap_dev.Open()
1518
1519   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1520                                         instance.name, int(time.time()))
1521   if not os.path.exists(constants.LOG_OS_DIR):
1522     os.mkdir(constants.LOG_OS_DIR, 0750)
1523
1524   destcmd = utils.BuildShellCmd('cat %s', src_image)
1525   remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1526                                                    constants.GANETI_RUNAS,
1527                                                    destcmd)
1528
1529   comprcmd = "gunzip"
1530   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1531                                inst_os.path, import_script, instance.name,
1532                                real_os_dev.dev_path, real_swap_dev.dev_path,
1533                                logfile)
1534
1535   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1536   env = {'HYPERVISOR': instance.hypervisor}
1537
1538   result = utils.RunCmd(command, env=env)
1539
1540   if result.failed:
1541     logging.error("os import command '%s' returned error: %s"
1542                   " output: %s", command, result.fail_reason, result.output)
1543     return False
1544
1545   return True
1546
1547
1548 def ListExports():
1549   """Return a list of exports currently available on this machine.
1550
1551   """
1552   if os.path.isdir(constants.EXPORT_DIR):
1553     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1554   else:
1555     return []
1556
1557
1558 def RemoveExport(export):
1559   """Remove an existing export from the node.
1560
1561   Args:
1562     export: the name of the export to remove
1563
1564   Returns:
1565     False in case of error, True otherwise.
1566
1567   """
1568   target = os.path.join(constants.EXPORT_DIR, export)
1569
1570   shutil.rmtree(target)
1571   # TODO: catch some of the relevant exceptions and provide a pretty
1572   # error message if rmtree fails.
1573
1574   return True
1575
1576
1577 def RenameBlockDevices(devlist):
1578   """Rename a list of block devices.
1579
1580   The devlist argument is a list of tuples (disk, new_logical,
1581   new_physical). The return value will be a combined boolean result
1582   (True only if all renames succeeded).
1583
1584   """
1585   result = True
1586   for disk, unique_id in devlist:
1587     dev = _RecursiveFindBD(disk)
1588     if dev is None:
1589       result = False
1590       continue
1591     try:
1592       old_rpath = dev.dev_path
1593       dev.Rename(unique_id)
1594       new_rpath = dev.dev_path
1595       if old_rpath != new_rpath:
1596         DevCacheManager.RemoveCache(old_rpath)
1597         # FIXME: we should add the new cache information here, like:
1598         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1599         # but we don't have the owner here - maybe parse from existing
1600         # cache? for now, we only lose lvm data when we rename, which
1601         # is less critical than DRBD or MD
1602     except errors.BlockDeviceError, err:
1603       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1604       result = False
1605   return result
1606
1607
1608 def _TransformFileStorageDir(file_storage_dir):
1609   """Checks whether given file_storage_dir is valid.
1610
1611   Checks wheter the given file_storage_dir is within the cluster-wide
1612   default file_storage_dir stored in SimpleStore. Only paths under that
1613   directory are allowed.
1614
1615   Args:
1616     file_storage_dir: string with path
1617
1618   Returns:
1619     normalized file_storage_dir (string) if valid, None otherwise
1620
1621   """
1622   cfg = _GetConfig()
1623   file_storage_dir = os.path.normpath(file_storage_dir)
1624   base_file_storage_dir = cfg.GetFileStorageDir()
1625   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1626       base_file_storage_dir):
1627     logging.error("file storage directory '%s' is not under base file"
1628                   " storage directory '%s'",
1629                   file_storage_dir, base_file_storage_dir)
1630     return None
1631   return file_storage_dir
1632
1633
1634 def CreateFileStorageDir(file_storage_dir):
1635   """Create file storage directory.
1636
1637   Args:
1638     file_storage_dir: string containing the path
1639
1640   Returns:
1641     tuple with first element a boolean indicating wheter dir
1642     creation was successful or not
1643
1644   """
1645   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1646   result = True,
1647   if not file_storage_dir:
1648     result = False,
1649   else:
1650     if os.path.exists(file_storage_dir):
1651       if not os.path.isdir(file_storage_dir):
1652         logging.error("'%s' is not a directory", file_storage_dir)
1653         result = False,
1654     else:
1655       try:
1656         os.makedirs(file_storage_dir, 0750)
1657       except OSError, err:
1658         logging.error("Cannot create file storage directory '%s': %s",
1659                       file_storage_dir, err)
1660         result = False,
1661   return result
1662
1663
1664 def RemoveFileStorageDir(file_storage_dir):
1665   """Remove file storage directory.
1666
1667   Remove it only if it's empty. If not log an error and return.
1668
1669   Args:
1670     file_storage_dir: string containing the path
1671
1672   Returns:
1673     tuple with first element a boolean indicating wheter dir
1674     removal was successful or not
1675
1676   """
1677   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1678   result = True,
1679   if not file_storage_dir:
1680     result = False,
1681   else:
1682     if os.path.exists(file_storage_dir):
1683       if not os.path.isdir(file_storage_dir):
1684         logging.error("'%s' is not a directory", file_storage_dir)
1685         result = False,
1686       # deletes dir only if empty, otherwise we want to return False
1687       try:
1688         os.rmdir(file_storage_dir)
1689       except OSError, err:
1690         logging.exception("Cannot remove file storage directory '%s'",
1691                           file_storage_dir)
1692         result = False,
1693   return result
1694
1695
1696 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1697   """Rename the file storage directory.
1698
1699   Args:
1700     old_file_storage_dir: string containing the old path
1701     new_file_storage_dir: string containing the new path
1702
1703   Returns:
1704     tuple with first element a boolean indicating wheter dir
1705     rename was successful or not
1706
1707   """
1708   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1709   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1710   result = True,
1711   if not old_file_storage_dir or not new_file_storage_dir:
1712     result = False,
1713   else:
1714     if not os.path.exists(new_file_storage_dir):
1715       if os.path.isdir(old_file_storage_dir):
1716         try:
1717           os.rename(old_file_storage_dir, new_file_storage_dir)
1718         except OSError, err:
1719           logging.exception("Cannot rename '%s' to '%s'",
1720                             old_file_storage_dir, new_file_storage_dir)
1721           result =  False,
1722       else:
1723         logging.error("'%s' is not a directory", old_file_storage_dir)
1724         result = False,
1725     else:
1726       if os.path.exists(old_file_storage_dir):
1727         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1728                       old_file_storage_dir, new_file_storage_dir)
1729         result = False,
1730   return result
1731
1732
1733 def _IsJobQueueFile(file_name):
1734   """Checks whether the given filename is in the queue directory.
1735
1736   """
1737   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1738   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1739
1740   if not result:
1741     logging.error("'%s' is not a file in the queue directory",
1742                   file_name)
1743
1744   return result
1745
1746
1747 def JobQueueUpdate(file_name, content):
1748   """Updates a file in the queue directory.
1749
1750   """
1751   if not _IsJobQueueFile(file_name):
1752     return False
1753
1754   # Write and replace the file atomically
1755   utils.WriteFile(file_name, data=content)
1756
1757   return True
1758
1759
1760 def JobQueueRename(old, new):
1761   """Renames a job queue file.
1762
1763   """
1764   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1765     return False
1766
1767   os.rename(old, new)
1768
1769   return True
1770
1771
1772 def CloseBlockDevices(disks):
1773   """Closes the given block devices.
1774
1775   This means they will be switched to secondary mode (in case of DRBD).
1776
1777   """
1778   bdevs = []
1779   for cf in disks:
1780     rd = _RecursiveFindBD(cf)
1781     if rd is None:
1782       return (False, "Can't find device %s" % cf)
1783     bdevs.append(rd)
1784
1785   msg = []
1786   for rd in bdevs:
1787     try:
1788       rd.Close()
1789     except errors.BlockDeviceError, err:
1790       msg.append(str(err))
1791   if msg:
1792     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1793   else:
1794     return (True, "All devices secondary")
1795
1796
1797 def ValidateHVParams(hvname, hvparams):
1798   """Validates the given hypervisor parameters.
1799
1800   @type hvname: string
1801   @param hvname: the hypervisor name
1802   @type hvparams: dict
1803   @param hvparams: the hypervisor parameters to be validated
1804   @rtype: tuple (bool, str)
1805   @return: tuple of (success, message)
1806
1807   """
1808   try:
1809     hv_type = hypervisor.GetHypervisor(hvname)
1810     hv_type.ValidateParameters(hvparams)
1811     return (True, "Validation passed")
1812   except errors.HypervisorError, err:
1813     return (False, str(err))
1814
1815
1816 class HooksRunner(object):
1817   """Hook runner.
1818
1819   This class is instantiated on the node side (ganeti-noded) and not on
1820   the master side.
1821
1822   """
1823   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1824
1825   def __init__(self, hooks_base_dir=None):
1826     """Constructor for hooks runner.
1827
1828     Args:
1829       - hooks_base_dir: if not None, this overrides the
1830         constants.HOOKS_BASE_DIR (useful for unittests)
1831
1832     """
1833     if hooks_base_dir is None:
1834       hooks_base_dir = constants.HOOKS_BASE_DIR
1835     self._BASE_DIR = hooks_base_dir
1836
1837   @staticmethod
1838   def ExecHook(script, env):
1839     """Exec one hook script.
1840
1841     Args:
1842      - script: the full path to the script
1843      - env: the environment with which to exec the script
1844
1845     """
1846     # exec the process using subprocess and log the output
1847     fdstdin = None
1848     try:
1849       fdstdin = open("/dev/null", "r")
1850       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1851                                stderr=subprocess.STDOUT, close_fds=True,
1852                                shell=False, cwd="/", env=env)
1853       output = ""
1854       try:
1855         output = child.stdout.read(4096)
1856         child.stdout.close()
1857       except EnvironmentError, err:
1858         output += "Hook script error: %s" % str(err)
1859
1860       while True:
1861         try:
1862           result = child.wait()
1863           break
1864         except EnvironmentError, err:
1865           if err.errno == errno.EINTR:
1866             continue
1867           raise
1868     finally:
1869       # try not to leak fds
1870       for fd in (fdstdin, ):
1871         if fd is not None:
1872           try:
1873             fd.close()
1874           except EnvironmentError, err:
1875             # just log the error
1876             #logging.exception("Error while closing fd %s", fd)
1877             pass
1878
1879     return result == 0, output
1880
1881   def RunHooks(self, hpath, phase, env):
1882     """Run the scripts in the hooks directory.
1883
1884     This method will not be usually overriden by child opcodes.
1885
1886     """
1887     if phase == constants.HOOKS_PHASE_PRE:
1888       suffix = "pre"
1889     elif phase == constants.HOOKS_PHASE_POST:
1890       suffix = "post"
1891     else:
1892       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1893     rr = []
1894
1895     subdir = "%s-%s.d" % (hpath, suffix)
1896     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1897     try:
1898       dir_contents = utils.ListVisibleFiles(dir_name)
1899     except OSError, err:
1900       # must log
1901       return rr
1902
1903     # we use the standard python sort order,
1904     # so 00name is the recommended naming scheme
1905     dir_contents.sort()
1906     for relname in dir_contents:
1907       fname = os.path.join(dir_name, relname)
1908       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1909           self.RE_MASK.match(relname) is not None):
1910         rrval = constants.HKR_SKIP
1911         output = ""
1912       else:
1913         result, output = self.ExecHook(fname, env)
1914         if not result:
1915           rrval = constants.HKR_FAIL
1916         else:
1917           rrval = constants.HKR_SUCCESS
1918       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1919
1920     return rr
1921
1922
1923 class IAllocatorRunner(object):
1924   """IAllocator runner.
1925
1926   This class is instantiated on the node side (ganeti-noded) and not on
1927   the master side.
1928
1929   """
1930   def Run(self, name, idata):
1931     """Run an iallocator script.
1932
1933     Return value: tuple of:
1934        - run status (one of the IARUN_ constants)
1935        - stdout
1936        - stderr
1937        - fail reason (as from utils.RunResult)
1938
1939     """
1940     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1941                                   os.path.isfile)
1942     if alloc_script is None:
1943       return (constants.IARUN_NOTFOUND, None, None, None)
1944
1945     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1946     try:
1947       os.write(fd, idata)
1948       os.close(fd)
1949       result = utils.RunCmd([alloc_script, fin_name])
1950       if result.failed:
1951         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1952                 result.fail_reason)
1953     finally:
1954       os.unlink(fin_name)
1955
1956     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1957
1958
1959 class DevCacheManager(object):
1960   """Simple class for managing a cache of block device information.
1961
1962   """
1963   _DEV_PREFIX = "/dev/"
1964   _ROOT_DIR = constants.BDEV_CACHE_DIR
1965
1966   @classmethod
1967   def _ConvertPath(cls, dev_path):
1968     """Converts a /dev/name path to the cache file name.
1969
1970     This replaces slashes with underscores and strips the /dev
1971     prefix. It then returns the full path to the cache file
1972
1973     """
1974     if dev_path.startswith(cls._DEV_PREFIX):
1975       dev_path = dev_path[len(cls._DEV_PREFIX):]
1976     dev_path = dev_path.replace("/", "_")
1977     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1978     return fpath
1979
1980   @classmethod
1981   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1982     """Updates the cache information for a given device.
1983
1984     """
1985     if dev_path is None:
1986       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1987       return
1988     fpath = cls._ConvertPath(dev_path)
1989     if on_primary:
1990       state = "primary"
1991     else:
1992       state = "secondary"
1993     if iv_name is None:
1994       iv_name = "not_visible"
1995     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1996     try:
1997       utils.WriteFile(fpath, data=fdata)
1998     except EnvironmentError, err:
1999       logging.exception("Can't update bdev cache for %s", dev_path)
2000
2001   @classmethod
2002   def RemoveCache(cls, dev_path):
2003     """Remove data for a dev_path.
2004
2005     """
2006     if dev_path is None:
2007       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2008       return
2009     fpath = cls._ConvertPath(dev_path)
2010     try:
2011       utils.RemoveFile(fpath)
2012     except EnvironmentError, err:
2013       logging.exception("Can't update bdev cache for %s", dev_path)