Some documentation updates
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner(cluster_name):
52   return ssh.SshRunner(cluster_name)
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.OwnIpAddress(master_ip):
121       # we already have the ip:
122       logging.debug("Already started")
123     else:
124       logging.error("Someone else has the master ip, not activating")
125       ok = False
126   else:
127     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128                            "dev", master_netdev, "label",
129                            "%s:0" % master_netdev])
130     if result.failed:
131       logging.error("Can't activate master IP: %s", result.output)
132       ok = False
133
134     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135                            "-s", master_ip, master_ip])
136     # we'll ignore the exit code of arping
137
138   # and now start the master and rapi daemons
139   if start_daemons:
140     for daemon in 'ganeti-masterd', 'ganeti-rapi':
141       result = utils.RunCmd([daemon])
142       if result.failed:
143         logging.error("Can't start daemon %s: %s", daemon, result.output)
144         ok = False
145   return ok
146
147
148 def StopMaster(stop_daemons):
149   """Deactivate this node as master.
150
151   The function will always try to deactivate the IP address of the
152   master. Then, if the stop_daemons parameter is True, it will also
153   stop the master daemons (ganet-masterd and ganeti-rapi).
154
155   """
156   master_netdev, master_ip, _ = GetMasterInfo()
157   if not master_netdev:
158     return False
159
160   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161                          "dev", master_netdev])
162   if result.failed:
163     logging.error("Can't remove the master IP, error: %s", result.output)
164     # but otherwise ignore the failure
165
166   if stop_daemons:
167     # stop/kill the rapi and the master daemon
168     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170
171   return True
172
173
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175   """Joins this node to the cluster.
176
177   This does the following:
178       - updates the hostkeys of the machine (rsa and dsa)
179       - adds the ssh private key to the user
180       - adds the ssh public key to the users' authorized_keys file
181
182   """
183   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187   for name, content, mode in sshd_keys:
188     utils.WriteFile(name, data=content, mode=mode)
189
190   try:
191     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192                                                     mkdir=True)
193   except errors.OpExecError, err:
194     logging.exception("Error while processing user ssh files")
195     return False
196
197   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198     utils.WriteFile(name, data=content, mode=0600)
199
200   utils.AddAuthorizedKey(auth_keys, sshpub)
201
202   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203
204   return True
205
206
207 def LeaveCluster():
208   """Cleans up the current node and prepares it to be removed from the cluster.
209
210   """
211   _CleanDirectory(constants.DATA_DIR)
212   JobQueuePurge()
213
214   try:
215     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216   except errors.OpExecError:
217     logging.exception("Error while processing ssh files")
218     return
219
220   f = open(pub_key, 'r')
221   try:
222     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223   finally:
224     f.close()
225
226   utils.RemoveFile(priv_key)
227   utils.RemoveFile(pub_key)
228
229   # Return a reassuring string to the caller, and quit
230   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231
232
233 def GetNodeInfo(vgname, hypervisor_type):
234   """Gives back a hash with different informations about the node.
235
236   @type vgname: C{string}
237   @param vgname: the name of the volume group to ask for disk space information
238   @type hypervisor_type: C{str}
239   @param hypervisor_type: the name of the hypervisor to ask for
240       memory information
241   @rtype: C{dict}
242   @return: dictionary with the following keys:
243       - vg_size is the size of the configured volume group in MiB
244       - vg_free is the free size of the volume group in MiB
245       - memory_dom0 is the memory allocated for domain0 in MiB
246       - memory_free is the currently available (free) ram in MiB
247       - memory_total is the total number of ram in MiB
248
249   """
250   outputarray = {}
251   vginfo = _GetVGInfo(vgname)
252   outputarray['vg_size'] = vginfo['vg_size']
253   outputarray['vg_free'] = vginfo['vg_free']
254
255   hyper = hypervisor.GetHypervisor(hypervisor_type)
256   hyp_info = hyper.GetNodeInfo()
257   if hyp_info is not None:
258     outputarray.update(hyp_info)
259
260   f = open("/proc/sys/kernel/random/boot_id", 'r')
261   try:
262     outputarray["bootid"] = f.read(128).rstrip("\n")
263   finally:
264     f.close()
265
266   return outputarray
267
268
269 def VerifyNode(what, cluster_name):
270   """Verify the status of the local node.
271
272   Based on the input L{what} parameter, various checks are done on the
273   local node.
274
275   If the I{filelist} key is present, this list of
276   files is checksummed and the file/checksum pairs are returned.
277
278   If the I{nodelist} key is present, we check that we have
279   connectivity via ssh with the target nodes (and check the hostname
280   report).
281
282   If the I{node-net-test} key is present, we check that we have
283   connectivity to the given nodes via both primary IP and, if
284   applicable, secondary IPs.
285
286   @type what: C{dict}
287   @param what: a dictionary of things to check:
288       - filelist: list of files for which to compute checksums
289       - nodelist: list of nodes we should check ssh communication with
290       - node-net-test: list of nodes we should check node daemon port
291         connectivity with
292       - hypervisor: list with hypervisors to run the verify for
293
294   """
295   result = {}
296
297   if 'hypervisor' in what:
298     result['hypervisor'] = my_dict = {}
299     for hv_name in what['hypervisor']:
300       my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301
302   if 'filelist' in what:
303     result['filelist'] = utils.FingerprintFiles(what['filelist'])
304
305   if 'nodelist' in what:
306     result['nodelist'] = {}
307     random.shuffle(what['nodelist'])
308     for node in what['nodelist']:
309       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310       if not success:
311         result['nodelist'][node] = message
312   if 'node-net-test' in what:
313     result['node-net-test'] = {}
314     my_name = utils.HostInfo().name
315     my_pip = my_sip = None
316     for name, pip, sip in what['node-net-test']:
317       if name == my_name:
318         my_pip = pip
319         my_sip = sip
320         break
321     if not my_pip:
322       result['node-net-test'][my_name] = ("Can't find my own"
323                                           " primary/secondary IP"
324                                           " in the node list")
325     else:
326       port = utils.GetNodeDaemonPort()
327       for name, pip, sip in what['node-net-test']:
328         fail = []
329         if not utils.TcpPing(pip, port, source=my_pip):
330           fail.append("primary")
331         if sip != pip:
332           if not utils.TcpPing(sip, port, source=my_sip):
333             fail.append("secondary")
334         if fail:
335           result['node-net-test'][name] = ("failure using the %s"
336                                            " interface(s)" %
337                                            " and ".join(fail))
338
339   return result
340
341
342 def GetVolumeList(vg_name):
343   """Compute list of logical volumes and their size.
344
345   Returns:
346     dictionary of all partions (key) with their size (in MiB), inactive
347     and online status:
348     {'test1': ('20.06', True, True)}
349
350   """
351   lvs = {}
352   sep = '|'
353   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354                          "--separator=%s" % sep,
355                          "-olv_name,lv_size,lv_attr", vg_name])
356   if result.failed:
357     logging.error("Failed to list logical volumes, lvs output: %s",
358                   result.output)
359     return result.output
360
361   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362   for line in result.stdout.splitlines():
363     line = line.strip()
364     match = valid_line_re.match(line)
365     if not match:
366       logging.error("Invalid line returned from lvs output: '%s'", line)
367       continue
368     name, size, attr = match.groups()
369     inactive = attr[4] == '-'
370     online = attr[5] == 'o'
371     lvs[name] = (size, inactive, online)
372
373   return lvs
374
375
376 def ListVolumeGroups():
377   """List the volume groups and their size.
378
379   Returns:
380     Dictionary with keys volume name and values the size of the volume
381
382   """
383   return utils.ListVolumeGroups()
384
385
386 def NodeVolumes():
387   """List all volumes on this node.
388
389   """
390   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391                          "--separator=|",
392                          "--options=lv_name,lv_size,devices,vg_name"])
393   if result.failed:
394     logging.error("Failed to list logical volumes, lvs output: %s",
395                   result.output)
396     return {}
397
398   def parse_dev(dev):
399     if '(' in dev:
400       return dev.split('(')[0]
401     else:
402       return dev
403
404   def map_line(line):
405     return {
406       'name': line[0].strip(),
407       'size': line[1].strip(),
408       'dev': parse_dev(line[2].strip()),
409       'vg': line[3].strip(),
410     }
411
412   return [map_line(line.split('|')) for line in result.stdout.splitlines()
413           if line.count('|') >= 3]
414
415
416 def BridgesExist(bridges_list):
417   """Check if a list of bridges exist on the current node.
418
419   @rtype: boolean
420   @return: C{True} if all of them exist, C{False} otherwise
421
422   """
423   for bridge in bridges_list:
424     if not utils.BridgeExists(bridge):
425       return False
426
427   return True
428
429
430 def GetInstanceList(hypervisor_list):
431   """Provides a list of instances.
432
433   @type hypervisor_list: list
434   @param hypervisor_list: the list of hypervisors to query information
435
436   @rtype: list
437   @return: a list of all running instances on the current node
438              - instance1.example.com
439              - instance2.example.com
440
441   """
442   results = []
443   for hname in hypervisor_list:
444     try:
445       names = hypervisor.GetHypervisor(hname).ListInstances()
446       results.extend(names)
447     except errors.HypervisorError, err:
448       logging.exception("Error enumerating instances for hypevisor %s", hname)
449       # FIXME: should we somehow not propagate this to the master?
450       raise
451
452   return results
453
454
455 def GetInstanceInfo(instance, hname):
456   """Gives back the informations about an instance as a dictionary.
457
458   @type instance: string
459   @param instance: the instance name
460   @type hname: string
461   @param hname: the hypervisor type of the instance
462
463   @rtype: dict
464   @return: dictionary with the following keys:
465       - memory: memory size of instance (int)
466       - state: xen state of instance (string)
467       - time: cpu time of instance (float)
468
469   """
470   output = {}
471
472   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473   if iinfo is not None:
474     output['memory'] = iinfo[2]
475     output['state'] = iinfo[4]
476     output['time'] = iinfo[5]
477
478   return output
479
480
481 def GetAllInstancesInfo(hypervisor_list):
482   """Gather data about all instances.
483
484   This is the equivalent of `GetInstanceInfo()`, except that it
485   computes data for all instances at once, thus being faster if one
486   needs data about more than one instance.
487
488   @type hypervisor_list: list
489   @param hypervisor_list: list of hypervisors to query for instance data
490
491   @rtype: dict of dicts
492   @return: dictionary of instance: data, with data having the following keys:
493       - memory: memory size of instance (int)
494       - state: xen state of instance (string)
495       - time: cpu time of instance (float)
496       - vcpuus: the number of vcpus
497
498   """
499   output = {}
500
501   for hname in hypervisor_list:
502     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503     if iinfo:
504       for name, inst_id, memory, vcpus, state, times in iinfo:
505         value = {
506           'memory': memory,
507           'vcpus': vcpus,
508           'state': state,
509           'time': times,
510           }
511         if name in output and output[name] != value:
512           raise errors.HypervisorError("Instance %s running duplicate"
513                                        " with different parameters" % name)
514         output[name] = value
515
516   return output
517
518
519 def AddOSToInstance(instance):
520   """Add an OS to an instance.
521
522   @type instance: L{objects.Instance}
523   @param instance: Instance whose OS is to be installed
524
525   """
526   inst_os = OSFromDisk(instance.os)
527
528   create_script = inst_os.create_script
529   create_env = OSEnvironment(instance)
530
531   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532                                      instance.name, int(time.time()))
533   if not os.path.exists(constants.LOG_OS_DIR):
534     os.mkdir(constants.LOG_OS_DIR, 0750)
535
536   command = utils.BuildShellCmd("cd %s && %s &>%s",
537                                 inst_os.path, create_script, logfile)
538
539   result = utils.RunCmd(command, env=create_env)
540   if result.failed:
541     logging.error("os create command '%s' returned error: %s, logfile: %s,"
542                   " output: %s", command, result.fail_reason, logfile,
543                   result.output)
544     return False
545
546   return True
547
548
549 def RunRenameInstance(instance, old_name):
550   """Run the OS rename script for an instance.
551
552   @type instance: L{objects.Instance}
553   @param instance: Instance whose OS is to be installed
554   @type old_name: string
555   @param old_name: previous instance name
556
557   """
558   inst_os = OSFromDisk(instance.os)
559
560   script = inst_os.rename_script
561   rename_env = OSEnvironment(instance)
562   rename_env['OLD_INSTANCE_NAME'] = old_name
563
564   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
565                                            old_name,
566                                            instance.name, int(time.time()))
567   if not os.path.exists(constants.LOG_OS_DIR):
568     os.mkdir(constants.LOG_OS_DIR, 0750)
569
570   command = utils.BuildShellCmd("cd %s && %s &>%s",
571                                 inst_os.path, script, logfile)
572
573   result = utils.RunCmd(command, env=rename_env)
574
575   if result.failed:
576     logging.error("os create command '%s' returned error: %s output: %s",
577                   command, result.fail_reason, result.output)
578     return False
579
580   return True
581
582
583 def _GetVGInfo(vg_name):
584   """Get informations about the volume group.
585
586   Args:
587     vg_name: the volume group
588
589   Returns:
590     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
591     where
592     vg_size is the total size of the volume group in MiB
593     vg_free is the free size of the volume group in MiB
594     pv_count are the number of physical disks in that vg
595
596   If an error occurs during gathering of data, we return the same dict
597   with keys all set to None.
598
599   """
600   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
601
602   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603                          "--nosuffix", "--units=m", "--separator=:", vg_name])
604
605   if retval.failed:
606     logging.error("volume group %s not present", vg_name)
607     return retdic
608   valarr = retval.stdout.strip().rstrip(':').split(':')
609   if len(valarr) == 3:
610     try:
611       retdic = {
612         "vg_size": int(round(float(valarr[0]), 0)),
613         "vg_free": int(round(float(valarr[1]), 0)),
614         "pv_count": int(valarr[2]),
615         }
616     except ValueError, err:
617       logging.exception("Fail to parse vgs output")
618   else:
619     logging.error("vgs output has the wrong number of fields (expected"
620                   " three): %s", str(valarr))
621   return retdic
622
623
624 def _GatherBlockDevs(instance):
625   """Set up an instance's block device(s).
626
627   This is run on the primary node at instance startup. The block
628   devices must be already assembled.
629
630   """
631   block_devices = []
632   for disk in instance.disks:
633     device = _RecursiveFindBD(disk)
634     if device is None:
635       raise errors.BlockDeviceError("Block device '%s' is not set up." %
636                                     str(disk))
637     device.Open()
638     block_devices.append((disk, device))
639   return block_devices
640
641
642 def StartInstance(instance, extra_args):
643   """Start an instance.
644
645   @type instance: instance object
646   @param instance: the instance object
647   @rtype: boolean
648   @return: whether the startup was successful or not
649
650   """
651   running_instances = GetInstanceList([instance.hypervisor])
652
653   if instance.name in running_instances:
654     return True
655
656   block_devices = _GatherBlockDevs(instance)
657   hyper = hypervisor.GetHypervisor(instance.hypervisor)
658
659   try:
660     hyper.StartInstance(instance, block_devices, extra_args)
661   except errors.HypervisorError, err:
662     logging.exception("Failed to start instance")
663     return False
664
665   return True
666
667
668 def ShutdownInstance(instance):
669   """Shut an instance down.
670
671   @type instance: instance object
672   @param instance: the instance object
673   @rtype: boolean
674   @return: whether the startup was successful or not
675
676   """
677   hv_name = instance.hypervisor
678   running_instances = GetInstanceList([hv_name])
679
680   if instance.name not in running_instances:
681     return True
682
683   hyper = hypervisor.GetHypervisor(hv_name)
684   try:
685     hyper.StopInstance(instance)
686   except errors.HypervisorError, err:
687     logging.error("Failed to stop instance")
688     return False
689
690   # test every 10secs for 2min
691   shutdown_ok = False
692
693   time.sleep(1)
694   for dummy in range(11):
695     if instance.name not in GetInstanceList([hv_name]):
696       break
697     time.sleep(10)
698   else:
699     # the shutdown did not succeed
700     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
701
702     try:
703       hyper.StopInstance(instance, force=True)
704     except errors.HypervisorError, err:
705       logging.exception("Failed to stop instance")
706       return False
707
708     time.sleep(1)
709     if instance.name in GetInstanceList([hv_name]):
710       logging.error("could not shutdown instance '%s' even by destroy",
711                     instance.name)
712       return False
713
714   return True
715
716
717 def RebootInstance(instance, reboot_type, extra_args):
718   """Reboot an instance.
719
720   Args:
721     instance    - name of instance to reboot
722     reboot_type - how to reboot [soft,hard,full]
723
724   """
725   running_instances = GetInstanceList([instance.hypervisor])
726
727   if instance.name not in running_instances:
728     logging.error("Cannot reboot instance that is not running")
729     return False
730
731   hyper = hypervisor.GetHypervisor(instance.hypervisor)
732   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
733     try:
734       hyper.RebootInstance(instance)
735     except errors.HypervisorError, err:
736       logging.exception("Failed to soft reboot instance")
737       return False
738   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
739     try:
740       ShutdownInstance(instance)
741       StartInstance(instance, extra_args)
742     except errors.HypervisorError, err:
743       logging.exception("Failed to hard reboot instance")
744       return False
745   else:
746     raise errors.ParameterError("reboot_type invalid")
747
748   return True
749
750
751 def MigrateInstance(instance, target, live):
752   """Migrates an instance to another node.
753
754   @type instance: L{objects.Instance}
755   @param instance: the instance definition
756   @type target: string
757   @param target: the target node name
758   @type live: boolean
759   @param live: whether the migration should be done live or not (the
760       interpretation of this parameter is left to the hypervisor)
761   @rtype: tuple
762   @return: a tuple of (success, msg) where:
763       - succes is a boolean denoting the success/failure of the operation
764       - msg is a string with details in case of failure
765
766   """
767   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
768
769   try:
770     hyper.MigrateInstance(instance.name, target, live)
771   except errors.HypervisorError, err:
772     msg = "Failed to migrate instance: %s" % str(err)
773     logging.error(msg)
774     return (False, msg)
775   return (True, "Migration successfull")
776
777
778 def CreateBlockDevice(disk, size, owner, on_primary, info):
779   """Creates a block device for an instance.
780
781   @type disk: L{objects.Disk}
782   @param disk: the object describing the disk we should create
783   @type size: int
784   @param size: the size of the physical underlying device, in MiB
785   @type owner: str
786   @param owner: the name of the instance for which disk is created,
787       used for device cache data
788   @type on_primary: boolean
789   @param on_primary:  indicates if it is the primary node or not
790   @type info: string
791   @param info: string that will be sent to the physical device
792       creation, used for example to set (LVM) tags on LVs
793
794   @return: the new unique_id of the device (this can sometime be
795       computed only after creation), or None. On secondary nodes,
796       it's not required to return anything.
797
798   """
799   clist = []
800   if disk.children:
801     for child in disk.children:
802       crdev = _RecursiveAssembleBD(child, owner, on_primary)
803       if on_primary or disk.AssembleOnSecondary():
804         # we need the children open in case the device itself has to
805         # be assembled
806         crdev.Open()
807       clist.append(crdev)
808   try:
809     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
810     if device is not None:
811       logging.info("removing existing device %s", disk)
812       device.Remove()
813   except errors.BlockDeviceError, err:
814     pass
815
816   device = bdev.Create(disk.dev_type, disk.physical_id,
817                        clist, size)
818   if device is None:
819     raise ValueError("Can't create child device for %s, %s" %
820                      (disk, size))
821   if on_primary or disk.AssembleOnSecondary():
822     if not device.Assemble():
823       errorstring = "Can't assemble device after creation"
824       logging.error(errorstring)
825       raise errors.BlockDeviceError("%s, very unusual event - check the node"
826                                     " daemon logs" % errorstring)
827     device.SetSyncSpeed(constants.SYNC_SPEED)
828     if on_primary or disk.OpenOnSecondary():
829       device.Open(force=True)
830     DevCacheManager.UpdateCache(device.dev_path, owner,
831                                 on_primary, disk.iv_name)
832
833   device.SetInfo(info)
834
835   physical_id = device.unique_id
836   return physical_id
837
838
839 def RemoveBlockDevice(disk):
840   """Remove a block device.
841
842   This is intended to be called recursively.
843
844   """
845   try:
846     # since we are removing the device, allow a partial match
847     # this allows removal of broken mirrors
848     rdev = _RecursiveFindBD(disk, allow_partial=True)
849   except errors.BlockDeviceError, err:
850     # probably can't attach
851     logging.info("Can't attach to device %s in remove", disk)
852     rdev = None
853   if rdev is not None:
854     r_path = rdev.dev_path
855     result = rdev.Remove()
856     if result:
857       DevCacheManager.RemoveCache(r_path)
858   else:
859     result = True
860   if disk.children:
861     for child in disk.children:
862       result = result and RemoveBlockDevice(child)
863   return result
864
865
866 def _RecursiveAssembleBD(disk, owner, as_primary):
867   """Activate a block device for an instance.
868
869   This is run on the primary and secondary nodes for an instance.
870
871   This function is called recursively.
872
873   Args:
874     disk: a objects.Disk object
875     as_primary: if we should make the block device read/write
876
877   Returns:
878     the assembled device or None (in case no device was assembled)
879
880   If the assembly is not successful, an exception is raised.
881
882   """
883   children = []
884   if disk.children:
885     mcn = disk.ChildrenNeeded()
886     if mcn == -1:
887       mcn = 0 # max number of Nones allowed
888     else:
889       mcn = len(disk.children) - mcn # max number of Nones
890     for chld_disk in disk.children:
891       try:
892         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
893       except errors.BlockDeviceError, err:
894         if children.count(None) >= mcn:
895           raise
896         cdev = None
897         logging.debug("Error in child activation: %s", str(err))
898       children.append(cdev)
899
900   if as_primary or disk.AssembleOnSecondary():
901     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
902     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
903     result = r_dev
904     if as_primary or disk.OpenOnSecondary():
905       r_dev.Open()
906     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
907                                 as_primary, disk.iv_name)
908
909   else:
910     result = True
911   return result
912
913
914 def AssembleBlockDevice(disk, owner, as_primary):
915   """Activate a block device for an instance.
916
917   This is a wrapper over _RecursiveAssembleBD.
918
919   @rtype: str or boolean
920   @return: a C{/dev/...} path for primary nodes, and
921       C{True} for secondary nodes
922
923   """
924   result = _RecursiveAssembleBD(disk, owner, as_primary)
925   if isinstance(result, bdev.BlockDev):
926     result = result.dev_path
927   return result
928
929
930 def ShutdownBlockDevice(disk):
931   """Shut down a block device.
932
933   First, if the device is assembled (can `Attach()`), then the device
934   is shutdown. Then the children of the device are shutdown.
935
936   This function is called recursively. Note that we don't cache the
937   children or such, as oppossed to assemble, shutdown of different
938   devices doesn't require that the upper device was active.
939
940   """
941   r_dev = _RecursiveFindBD(disk)
942   if r_dev is not None:
943     r_path = r_dev.dev_path
944     result = r_dev.Shutdown()
945     if result:
946       DevCacheManager.RemoveCache(r_path)
947   else:
948     result = True
949   if disk.children:
950     for child in disk.children:
951       result = result and ShutdownBlockDevice(child)
952   return result
953
954
955 def MirrorAddChildren(parent_cdev, new_cdevs):
956   """Extend a mirrored block device.
957
958   """
959   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
960   if parent_bdev is None:
961     logging.error("Can't find parent device")
962     return False
963   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
964   if new_bdevs.count(None) > 0:
965     logging.error("Can't find new device(s) to add: %s:%s",
966                   new_bdevs, new_cdevs)
967     return False
968   parent_bdev.AddChildren(new_bdevs)
969   return True
970
971
972 def MirrorRemoveChildren(parent_cdev, new_cdevs):
973   """Shrink a mirrored block device.
974
975   """
976   parent_bdev = _RecursiveFindBD(parent_cdev)
977   if parent_bdev is None:
978     logging.error("Can't find parent in remove children: %s", parent_cdev)
979     return False
980   devs = []
981   for disk in new_cdevs:
982     rpath = disk.StaticDevPath()
983     if rpath is None:
984       bd = _RecursiveFindBD(disk)
985       if bd is None:
986         logging.error("Can't find dynamic device %s while removing children",
987                       disk)
988         return False
989       else:
990         devs.append(bd.dev_path)
991     else:
992       devs.append(rpath)
993   parent_bdev.RemoveChildren(devs)
994   return True
995
996
997 def GetMirrorStatus(disks):
998   """Get the mirroring status of a list of devices.
999
1000   Args:
1001     disks: list of `objects.Disk`
1002
1003   Returns:
1004     list of (mirror_done, estimated_time) tuples, which
1005     are the result of bdev.BlockDevice.CombinedSyncStatus()
1006
1007   """
1008   stats = []
1009   for dsk in disks:
1010     rbd = _RecursiveFindBD(dsk)
1011     if rbd is None:
1012       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1013     stats.append(rbd.CombinedSyncStatus())
1014   return stats
1015
1016
1017 def _RecursiveFindBD(disk, allow_partial=False):
1018   """Check if a device is activated.
1019
1020   If so, return informations about the real device.
1021
1022   Args:
1023     disk: the objects.Disk instance
1024     allow_partial: don't abort the find if a child of the
1025                    device can't be found; this is intended to be
1026                    used when repairing mirrors
1027
1028   Returns:
1029     None if the device can't be found
1030     otherwise the device instance
1031
1032   """
1033   children = []
1034   if disk.children:
1035     for chdisk in disk.children:
1036       children.append(_RecursiveFindBD(chdisk))
1037
1038   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1039
1040
1041 def FindBlockDevice(disk):
1042   """Check if a device is activated.
1043
1044   If so, return informations about the real device.
1045
1046   Args:
1047     disk: the objects.Disk instance
1048   Returns:
1049     None if the device can't be found
1050     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1051
1052   """
1053   rbd = _RecursiveFindBD(disk)
1054   if rbd is None:
1055     return rbd
1056   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1057
1058
1059 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1060   """Write a file to the filesystem.
1061
1062   This allows the master to overwrite(!) a file. It will only perform
1063   the operation if the file belongs to a list of configuration files.
1064
1065   """
1066   if not os.path.isabs(file_name):
1067     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1068                   file_name)
1069     return False
1070
1071   allowed_files = [
1072     constants.CLUSTER_CONF_FILE,
1073     constants.ETC_HOSTS,
1074     constants.SSH_KNOWN_HOSTS_FILE,
1075     constants.VNC_PASSWORD_FILE,
1076     ]
1077
1078   if file_name not in allowed_files:
1079     logging.error("Filename passed to UploadFile not in allowed"
1080                  " upload targets: '%s'", file_name)
1081     return False
1082
1083   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1084                   atime=atime, mtime=mtime)
1085   return True
1086
1087
1088 def _ErrnoOrStr(err):
1089   """Format an EnvironmentError exception.
1090
1091   If the `err` argument has an errno attribute, it will be looked up
1092   and converted into a textual EXXXX description. Otherwise the string
1093   representation of the error will be returned.
1094
1095   """
1096   if hasattr(err, 'errno'):
1097     detail = errno.errorcode[err.errno]
1098   else:
1099     detail = str(err)
1100   return detail
1101
1102
1103 def _OSOndiskVersion(name, os_dir):
1104   """Compute and return the API version of a given OS.
1105
1106   This function will try to read the API version of the os given by
1107   the 'name' parameter and residing in the 'os_dir' directory.
1108
1109   Return value will be either an integer denoting the version or None in the
1110   case when this is not a valid OS name.
1111
1112   """
1113   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1114
1115   try:
1116     st = os.stat(api_file)
1117   except EnvironmentError, err:
1118     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1119                            " found (%s)" % _ErrnoOrStr(err))
1120
1121   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1122     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1123                            " a regular file")
1124
1125   try:
1126     f = open(api_file)
1127     try:
1128       api_versions = f.readlines()
1129     finally:
1130       f.close()
1131   except EnvironmentError, err:
1132     raise errors.InvalidOS(name, os_dir, "error while reading the"
1133                            " API version (%s)" % _ErrnoOrStr(err))
1134
1135   api_versions = [version.strip() for version in api_versions]
1136   try:
1137     api_versions = [int(version) for version in api_versions]
1138   except (TypeError, ValueError), err:
1139     raise errors.InvalidOS(name, os_dir,
1140                            "API version is not integer (%s)" % str(err))
1141
1142   return api_versions
1143
1144
1145 def DiagnoseOS(top_dirs=None):
1146   """Compute the validity for all OSes.
1147
1148   Returns an OS object for each name in all the given top directories
1149   (if not given defaults to constants.OS_SEARCH_PATH)
1150
1151   Returns:
1152     list of OS objects
1153
1154   """
1155   if top_dirs is None:
1156     top_dirs = constants.OS_SEARCH_PATH
1157
1158   result = []
1159   for dir_name in top_dirs:
1160     if os.path.isdir(dir_name):
1161       try:
1162         f_names = utils.ListVisibleFiles(dir_name)
1163       except EnvironmentError, err:
1164         logging.exception("Can't list the OS directory %s", dir_name)
1165         break
1166       for name in f_names:
1167         try:
1168           os_inst = OSFromDisk(name, base_dir=dir_name)
1169           result.append(os_inst)
1170         except errors.InvalidOS, err:
1171           result.append(objects.OS.FromInvalidOS(err))
1172
1173   return result
1174
1175
1176 def OSFromDisk(name, base_dir=None):
1177   """Create an OS instance from disk.
1178
1179   This function will return an OS instance if the given name is a
1180   valid OS name. Otherwise, it will raise an appropriate
1181   `errors.InvalidOS` exception, detailing why this is not a valid
1182   OS.
1183
1184   @type base_dir: string
1185   @keyword base_dir: Base directory containing OS installations.
1186                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1187
1188   """
1189
1190   if base_dir is None:
1191     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1192     if os_dir is None:
1193       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1194   else:
1195     os_dir = os.path.sep.join([base_dir, name])
1196
1197   api_versions = _OSOndiskVersion(name, os_dir)
1198
1199   if constants.OS_API_VERSION not in api_versions:
1200     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1201                            " (found %s want %s)"
1202                            % (api_versions, constants.OS_API_VERSION))
1203
1204   # OS Scripts dictionary, we will populate it with the actual script names
1205   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1206
1207   for script in os_scripts:
1208     os_scripts[script] = os.path.sep.join([os_dir, script])
1209
1210     try:
1211       st = os.stat(os_scripts[script])
1212     except EnvironmentError, err:
1213       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1214                              (script, _ErrnoOrStr(err)))
1215
1216     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1217       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1218                              script)
1219
1220     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1221       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1222                              script)
1223
1224
1225   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1226                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1227                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1228                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1229                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1230                     api_versions=api_versions)
1231
1232 def OSEnvironment(instance, debug=0):
1233   """Calculate the environment for an os script.
1234
1235   @type instance: instance object
1236   @param instance: target instance for the os script run
1237   @type debug: integer
1238   @param debug: debug level (0 or 1, for os api 10)
1239   @rtype: dict
1240   @return: dict of environment variables
1241
1242   """
1243   result = {}
1244   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1245   result['INSTANCE_NAME'] = instance.name
1246   result['HYPERVISOR'] = instance.hypervisor
1247   result['DISK_COUNT'] = '%d' % len(instance.disks)
1248   result['NIC_COUNT'] = '%d' % len(instance.nics)
1249   result['DEBUG_LEVEL'] = '%d' % debug
1250   for idx, disk in enumerate(instance.disks):
1251     real_disk = _RecursiveFindBD(disk)
1252     if real_disk is None:
1253       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1254                                     str(disk))
1255     real_disk.Open()
1256     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1257     # FIXME: When disks will have read-only mode, populate this
1258     result['DISK_%d_ACCESS' % idx] = 'W'
1259     if constants.HV_DISK_TYPE in instance.hvparams:
1260       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1261         instance.hvparams[constants.HV_DISK_TYPE]
1262     if disk.dev_type in constants.LDS_BLOCK:
1263       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1264     elif disk.dev_type == constants.LD_FILE:
1265       result['DISK_%d_BACKEND_TYPE' % idx] = \
1266         'file:%s' % disk.physical_id[0]
1267   for idx, nic in enumerate(instance.nics):
1268     result['NIC_%d_MAC' % idx] = nic.mac
1269     if nic.ip:
1270       result['NIC_%d_IP' % idx] = nic.ip
1271     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1272     if constants.HV_NIC_TYPE in instance.hvparams:
1273       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1274         instance.hvparams[constants.HV_NIC_TYPE]
1275
1276   return result
1277
1278 def GrowBlockDevice(disk, amount):
1279   """Grow a stack of block devices.
1280
1281   This function is called recursively, with the childrens being the
1282   first one resize.
1283
1284   Args:
1285     disk: the disk to be grown
1286
1287   Returns: a tuple of (status, result), with:
1288     status: the result (true/false) of the operation
1289     result: the error message if the operation failed, otherwise not used
1290
1291   """
1292   r_dev = _RecursiveFindBD(disk)
1293   if r_dev is None:
1294     return False, "Cannot find block device %s" % (disk,)
1295
1296   try:
1297     r_dev.Grow(amount)
1298   except errors.BlockDeviceError, err:
1299     return False, str(err)
1300
1301   return True, None
1302
1303
1304 def SnapshotBlockDevice(disk):
1305   """Create a snapshot copy of a block device.
1306
1307   This function is called recursively, and the snapshot is actually created
1308   just for the leaf lvm backend device.
1309
1310   @type disk: L{objects.Disk}
1311   @param disk: the disk to be snapshotted
1312   @rtype: string
1313   @return: snapshot disk path
1314
1315   """
1316   if disk.children:
1317     if len(disk.children) == 1:
1318       # only one child, let's recurse on it
1319       return SnapshotBlockDevice(disk.children[0])
1320     else:
1321       # more than one child, choose one that matches
1322       for child in disk.children:
1323         if child.size == disk.size:
1324           # return implies breaking the loop
1325           return SnapshotBlockDevice(child)
1326   elif disk.dev_type == constants.LD_LV:
1327     r_dev = _RecursiveFindBD(disk)
1328     if r_dev is not None:
1329       # let's stay on the safe side and ask for the full size, for now
1330       return r_dev.Snapshot(disk.size)
1331     else:
1332       return None
1333   else:
1334     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1335                                  " '%s' of type '%s'" %
1336                                  (disk.unique_id, disk.dev_type))
1337
1338
1339 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1340   """Export a block device snapshot to a remote node.
1341
1342   @type disk: L{objects.Disk}
1343   @param disk: the description of the disk to export
1344   @type dest_node: str
1345   @param dest_node: the destination node to export to
1346   @type instance: L{objects.Instance}
1347   @param instance: the instance object to whom the disk belongs
1348   @type cluster_name: str
1349   @param cluster_name: the cluster name, needed for SSH hostalias
1350   @type idx: int
1351   @param idx: the index of the disk in the instance's disk list,
1352       used to export to the OS scripts environment
1353   @rtype: bool
1354   @return: the success of the operation
1355
1356   """
1357   export_env = OSEnvironment(instance)
1358
1359   inst_os = OSFromDisk(instance.os)
1360   export_script = inst_os.export_script
1361
1362   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1363                                      instance.name, int(time.time()))
1364   if not os.path.exists(constants.LOG_OS_DIR):
1365     os.mkdir(constants.LOG_OS_DIR, 0750)
1366   real_disk = _RecursiveFindBD(disk)
1367   if real_disk is None:
1368     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1369                                   str(disk))
1370   real_disk.Open()
1371
1372   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1373   export_env['EXPORT_INDEX'] = str(idx)
1374
1375   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1376   destfile = disk.physical_id[1]
1377
1378   # the target command is built out of three individual commands,
1379   # which are joined by pipes; we check each individual command for
1380   # valid parameters
1381   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1382                                export_script, logfile)
1383
1384   comprcmd = "gzip"
1385
1386   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1387                                 destdir, destdir, destfile)
1388   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1389                                                    constants.GANETI_RUNAS,
1390                                                    destcmd)
1391
1392   # all commands have been checked, so we're safe to combine them
1393   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1394
1395   result = utils.RunCmd(command, env=export_env)
1396
1397   if result.failed:
1398     logging.error("os snapshot export command '%s' returned error: %s"
1399                   " output: %s", command, result.fail_reason, result.output)
1400     return False
1401
1402   return True
1403
1404
1405 def FinalizeExport(instance, snap_disks):
1406   """Write out the export configuration information.
1407
1408   Args:
1409     instance: instance configuration
1410     snap_disks: snapshot block devices
1411
1412   Returns:
1413     False in case of error, True otherwise.
1414
1415   """
1416   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1417   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1418
1419   config = objects.SerializableConfigParser()
1420
1421   config.add_section(constants.INISECT_EXP)
1422   config.set(constants.INISECT_EXP, 'version', '0')
1423   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1424   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1425   config.set(constants.INISECT_EXP, 'os', instance.os)
1426   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1427
1428   config.add_section(constants.INISECT_INS)
1429   config.set(constants.INISECT_INS, 'name', instance.name)
1430   config.set(constants.INISECT_INS, 'memory', '%d' %
1431              instance.beparams[constants.BE_MEMORY])
1432   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1433              instance.beparams[constants.BE_VCPUS])
1434   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1435
1436   nic_count = 0
1437   for nic_count, nic in enumerate(instance.nics):
1438     config.set(constants.INISECT_INS, 'nic%d_mac' %
1439                nic_count, '%s' % nic.mac)
1440     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1441     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1442                '%s' % nic.bridge)
1443   # TODO: redundant: on load can read nics until it doesn't exist
1444   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1445
1446   disk_count = 0
1447   for disk_count, disk in enumerate(snap_disks):
1448     if disk:
1449       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1450                  ('%s' % disk.iv_name))
1451       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1452                  ('%s' % disk.physical_id[1]))
1453       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1454                  ('%d' % disk.size))
1455   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1456
1457   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1458   cfo = open(cff, 'w')
1459   try:
1460     config.write(cfo)
1461   finally:
1462     cfo.close()
1463
1464   shutil.rmtree(finaldestdir, True)
1465   shutil.move(destdir, finaldestdir)
1466
1467   return True
1468
1469
1470 def ExportInfo(dest):
1471   """Get export configuration information.
1472
1473   Args:
1474     dest: directory containing the export
1475
1476   Returns:
1477     A serializable config file containing the export info.
1478
1479   """
1480   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1481
1482   config = objects.SerializableConfigParser()
1483   config.read(cff)
1484
1485   if (not config.has_section(constants.INISECT_EXP) or
1486       not config.has_section(constants.INISECT_INS)):
1487     return None
1488
1489   return config
1490
1491
1492 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1493   """Import an os image into an instance.
1494
1495   @type instance: L{objects.Instance}
1496   @param instance: instance to import the disks into
1497   @type src_node: string
1498   @param src_node: source node for the disk images
1499   @type src_images: list of string
1500   @param src_images: absolute paths of the disk images
1501   @rtype: list of boolean
1502   @return: each boolean represent the success of importing the n-th disk
1503
1504   """
1505   import_env = OSEnvironment(instance)
1506   inst_os = OSFromDisk(instance.os)
1507   import_script = inst_os.import_script
1508
1509   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1510                                         instance.name, int(time.time()))
1511   if not os.path.exists(constants.LOG_OS_DIR):
1512     os.mkdir(constants.LOG_OS_DIR, 0750)
1513
1514   comprcmd = "gunzip"
1515   impcmd = utils.BuildShellCmd("(cd %s; %s &>%s)", inst_os.path, import_script,
1516                                logfile)
1517
1518   final_result = []
1519   for idx, image in enumerate(src_images):
1520     if image:
1521       destcmd = utils.BuildShellCmd('cat %s', image)
1522       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1523                                                        constants.GANETI_RUNAS,
1524                                                        destcmd)
1525       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1526       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1527       import_env['IMPORT_INDEX'] = str(idx)
1528       result = utils.RunCmd(command, env=import_env)
1529       if result.failed:
1530         logging.error("disk import command '%s' returned error: %s"
1531                       " output: %s", command, result.fail_reason, result.output)
1532         final_result.append(False)
1533       else:
1534         final_result.append(True)
1535     else:
1536       final_result.append(True)
1537
1538   return final_result
1539
1540
1541 def ListExports():
1542   """Return a list of exports currently available on this machine.
1543
1544   """
1545   if os.path.isdir(constants.EXPORT_DIR):
1546     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1547   else:
1548     return []
1549
1550
1551 def RemoveExport(export):
1552   """Remove an existing export from the node.
1553
1554   Args:
1555     export: the name of the export to remove
1556
1557   Returns:
1558     False in case of error, True otherwise.
1559
1560   """
1561   target = os.path.join(constants.EXPORT_DIR, export)
1562
1563   shutil.rmtree(target)
1564   # TODO: catch some of the relevant exceptions and provide a pretty
1565   # error message if rmtree fails.
1566
1567   return True
1568
1569
1570 def RenameBlockDevices(devlist):
1571   """Rename a list of block devices.
1572
1573   The devlist argument is a list of tuples (disk, new_logical,
1574   new_physical). The return value will be a combined boolean result
1575   (True only if all renames succeeded).
1576
1577   """
1578   result = True
1579   for disk, unique_id in devlist:
1580     dev = _RecursiveFindBD(disk)
1581     if dev is None:
1582       result = False
1583       continue
1584     try:
1585       old_rpath = dev.dev_path
1586       dev.Rename(unique_id)
1587       new_rpath = dev.dev_path
1588       if old_rpath != new_rpath:
1589         DevCacheManager.RemoveCache(old_rpath)
1590         # FIXME: we should add the new cache information here, like:
1591         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1592         # but we don't have the owner here - maybe parse from existing
1593         # cache? for now, we only lose lvm data when we rename, which
1594         # is less critical than DRBD or MD
1595     except errors.BlockDeviceError, err:
1596       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1597       result = False
1598   return result
1599
1600
1601 def _TransformFileStorageDir(file_storage_dir):
1602   """Checks whether given file_storage_dir is valid.
1603
1604   Checks wheter the given file_storage_dir is within the cluster-wide
1605   default file_storage_dir stored in SimpleStore. Only paths under that
1606   directory are allowed.
1607
1608   @type file_storage_dir: str
1609   @param file_storage_dir: the path to check
1610
1611   @return: the normalized path if valid, None otherwise
1612
1613   """
1614   cfg = _GetConfig()
1615   file_storage_dir = os.path.normpath(file_storage_dir)
1616   base_file_storage_dir = cfg.GetFileStorageDir()
1617   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1618       base_file_storage_dir):
1619     logging.error("file storage directory '%s' is not under base file"
1620                   " storage directory '%s'",
1621                   file_storage_dir, base_file_storage_dir)
1622     return None
1623   return file_storage_dir
1624
1625
1626 def CreateFileStorageDir(file_storage_dir):
1627   """Create file storage directory.
1628
1629   @type file_storage_dir: str
1630   @param file_storage_dir: directory to create
1631
1632   @rtype: tuple
1633   @return: tuple with first element a boolean indicating wheter dir
1634       creation was successful or not
1635
1636   """
1637   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1638   result = True,
1639   if not file_storage_dir:
1640     result = False,
1641   else:
1642     if os.path.exists(file_storage_dir):
1643       if not os.path.isdir(file_storage_dir):
1644         logging.error("'%s' is not a directory", file_storage_dir)
1645         result = False,
1646     else:
1647       try:
1648         os.makedirs(file_storage_dir, 0750)
1649       except OSError, err:
1650         logging.error("Cannot create file storage directory '%s': %s",
1651                       file_storage_dir, err)
1652         result = False,
1653   return result
1654
1655
1656 def RemoveFileStorageDir(file_storage_dir):
1657   """Remove file storage directory.
1658
1659   Remove it only if it's empty. If not log an error and return.
1660
1661   Args:
1662     file_storage_dir: string containing the path
1663
1664   Returns:
1665     tuple with first element a boolean indicating wheter dir
1666     removal was successful or not
1667
1668   """
1669   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1670   result = True,
1671   if not file_storage_dir:
1672     result = False,
1673   else:
1674     if os.path.exists(file_storage_dir):
1675       if not os.path.isdir(file_storage_dir):
1676         logging.error("'%s' is not a directory", file_storage_dir)
1677         result = False,
1678       # deletes dir only if empty, otherwise we want to return False
1679       try:
1680         os.rmdir(file_storage_dir)
1681       except OSError, err:
1682         logging.exception("Cannot remove file storage directory '%s'",
1683                           file_storage_dir)
1684         result = False,
1685   return result
1686
1687
1688 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1689   """Rename the file storage directory.
1690
1691   Args:
1692     old_file_storage_dir: string containing the old path
1693     new_file_storage_dir: string containing the new path
1694
1695   Returns:
1696     tuple with first element a boolean indicating wheter dir
1697     rename was successful or not
1698
1699   """
1700   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1701   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1702   result = True,
1703   if not old_file_storage_dir or not new_file_storage_dir:
1704     result = False,
1705   else:
1706     if not os.path.exists(new_file_storage_dir):
1707       if os.path.isdir(old_file_storage_dir):
1708         try:
1709           os.rename(old_file_storage_dir, new_file_storage_dir)
1710         except OSError, err:
1711           logging.exception("Cannot rename '%s' to '%s'",
1712                             old_file_storage_dir, new_file_storage_dir)
1713           result =  False,
1714       else:
1715         logging.error("'%s' is not a directory", old_file_storage_dir)
1716         result = False,
1717     else:
1718       if os.path.exists(old_file_storage_dir):
1719         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1720                       old_file_storage_dir, new_file_storage_dir)
1721         result = False,
1722   return result
1723
1724
1725 def _IsJobQueueFile(file_name):
1726   """Checks whether the given filename is in the queue directory.
1727
1728   """
1729   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1730   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1731
1732   if not result:
1733     logging.error("'%s' is not a file in the queue directory",
1734                   file_name)
1735
1736   return result
1737
1738
1739 def JobQueueUpdate(file_name, content):
1740   """Updates a file in the queue directory.
1741
1742   """
1743   if not _IsJobQueueFile(file_name):
1744     return False
1745
1746   # Write and replace the file atomically
1747   utils.WriteFile(file_name, data=content)
1748
1749   return True
1750
1751
1752 def JobQueueRename(old, new):
1753   """Renames a job queue file.
1754
1755   """
1756   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1757     return False
1758
1759   os.rename(old, new)
1760
1761   return True
1762
1763
1764 def JobQueueSetDrainFlag(drain_flag):
1765   """Set the drain flag for the queue.
1766
1767   This will set or unset the queue drain flag.
1768
1769   @type drain_flag: bool
1770   @param drain_flag: if True, will set the drain flag, otherwise reset it.
1771
1772   """
1773   if drain_flag:
1774     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1775   else:
1776     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1777
1778   return True
1779
1780
1781 def CloseBlockDevices(disks):
1782   """Closes the given block devices.
1783
1784   This means they will be switched to secondary mode (in case of DRBD).
1785
1786   """
1787   bdevs = []
1788   for cf in disks:
1789     rd = _RecursiveFindBD(cf)
1790     if rd is None:
1791       return (False, "Can't find device %s" % cf)
1792     bdevs.append(rd)
1793
1794   msg = []
1795   for rd in bdevs:
1796     try:
1797       rd.Close()
1798     except errors.BlockDeviceError, err:
1799       msg.append(str(err))
1800   if msg:
1801     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1802   else:
1803     return (True, "All devices secondary")
1804
1805
1806 def ValidateHVParams(hvname, hvparams):
1807   """Validates the given hypervisor parameters.
1808
1809   @type hvname: string
1810   @param hvname: the hypervisor name
1811   @type hvparams: dict
1812   @param hvparams: the hypervisor parameters to be validated
1813   @rtype: tuple (bool, str)
1814   @return: tuple of (success, message)
1815
1816   """
1817   try:
1818     hv_type = hypervisor.GetHypervisor(hvname)
1819     hv_type.ValidateParameters(hvparams)
1820     return (True, "Validation passed")
1821   except errors.HypervisorError, err:
1822     return (False, str(err))
1823
1824
1825 class HooksRunner(object):
1826   """Hook runner.
1827
1828   This class is instantiated on the node side (ganeti-noded) and not on
1829   the master side.
1830
1831   """
1832   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1833
1834   def __init__(self, hooks_base_dir=None):
1835     """Constructor for hooks runner.
1836
1837     Args:
1838       - hooks_base_dir: if not None, this overrides the
1839         constants.HOOKS_BASE_DIR (useful for unittests)
1840
1841     """
1842     if hooks_base_dir is None:
1843       hooks_base_dir = constants.HOOKS_BASE_DIR
1844     self._BASE_DIR = hooks_base_dir
1845
1846   @staticmethod
1847   def ExecHook(script, env):
1848     """Exec one hook script.
1849
1850     Args:
1851      - script: the full path to the script
1852      - env: the environment with which to exec the script
1853
1854     """
1855     # exec the process using subprocess and log the output
1856     fdstdin = None
1857     try:
1858       fdstdin = open("/dev/null", "r")
1859       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1860                                stderr=subprocess.STDOUT, close_fds=True,
1861                                shell=False, cwd="/", env=env)
1862       output = ""
1863       try:
1864         output = child.stdout.read(4096)
1865         child.stdout.close()
1866       except EnvironmentError, err:
1867         output += "Hook script error: %s" % str(err)
1868
1869       while True:
1870         try:
1871           result = child.wait()
1872           break
1873         except EnvironmentError, err:
1874           if err.errno == errno.EINTR:
1875             continue
1876           raise
1877     finally:
1878       # try not to leak fds
1879       for fd in (fdstdin, ):
1880         if fd is not None:
1881           try:
1882             fd.close()
1883           except EnvironmentError, err:
1884             # just log the error
1885             #logging.exception("Error while closing fd %s", fd)
1886             pass
1887
1888     return result == 0, output
1889
1890   def RunHooks(self, hpath, phase, env):
1891     """Run the scripts in the hooks directory.
1892
1893     This method will not be usually overriden by child opcodes.
1894
1895     """
1896     if phase == constants.HOOKS_PHASE_PRE:
1897       suffix = "pre"
1898     elif phase == constants.HOOKS_PHASE_POST:
1899       suffix = "post"
1900     else:
1901       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1902     rr = []
1903
1904     subdir = "%s-%s.d" % (hpath, suffix)
1905     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1906     try:
1907       dir_contents = utils.ListVisibleFiles(dir_name)
1908     except OSError, err:
1909       # must log
1910       return rr
1911
1912     # we use the standard python sort order,
1913     # so 00name is the recommended naming scheme
1914     dir_contents.sort()
1915     for relname in dir_contents:
1916       fname = os.path.join(dir_name, relname)
1917       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1918           self.RE_MASK.match(relname) is not None):
1919         rrval = constants.HKR_SKIP
1920         output = ""
1921       else:
1922         result, output = self.ExecHook(fname, env)
1923         if not result:
1924           rrval = constants.HKR_FAIL
1925         else:
1926           rrval = constants.HKR_SUCCESS
1927       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1928
1929     return rr
1930
1931
1932 class IAllocatorRunner(object):
1933   """IAllocator runner.
1934
1935   This class is instantiated on the node side (ganeti-noded) and not on
1936   the master side.
1937
1938   """
1939   def Run(self, name, idata):
1940     """Run an iallocator script.
1941
1942     Return value: tuple of:
1943        - run status (one of the IARUN_ constants)
1944        - stdout
1945        - stderr
1946        - fail reason (as from utils.RunResult)
1947
1948     """
1949     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1950                                   os.path.isfile)
1951     if alloc_script is None:
1952       return (constants.IARUN_NOTFOUND, None, None, None)
1953
1954     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1955     try:
1956       os.write(fd, idata)
1957       os.close(fd)
1958       result = utils.RunCmd([alloc_script, fin_name])
1959       if result.failed:
1960         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1961                 result.fail_reason)
1962     finally:
1963       os.unlink(fin_name)
1964
1965     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1966
1967
1968 class DevCacheManager(object):
1969   """Simple class for managing a cache of block device information.
1970
1971   """
1972   _DEV_PREFIX = "/dev/"
1973   _ROOT_DIR = constants.BDEV_CACHE_DIR
1974
1975   @classmethod
1976   def _ConvertPath(cls, dev_path):
1977     """Converts a /dev/name path to the cache file name.
1978
1979     This replaces slashes with underscores and strips the /dev
1980     prefix. It then returns the full path to the cache file
1981
1982     """
1983     if dev_path.startswith(cls._DEV_PREFIX):
1984       dev_path = dev_path[len(cls._DEV_PREFIX):]
1985     dev_path = dev_path.replace("/", "_")
1986     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1987     return fpath
1988
1989   @classmethod
1990   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1991     """Updates the cache information for a given device.
1992
1993     """
1994     if dev_path is None:
1995       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1996       return
1997     fpath = cls._ConvertPath(dev_path)
1998     if on_primary:
1999       state = "primary"
2000     else:
2001       state = "secondary"
2002     if iv_name is None:
2003       iv_name = "not_visible"
2004     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2005     try:
2006       utils.WriteFile(fpath, data=fdata)
2007     except EnvironmentError, err:
2008       logging.exception("Can't update bdev cache for %s", dev_path)
2009
2010   @classmethod
2011   def RemoveCache(cls, dev_path):
2012     """Remove data for a dev_path.
2013
2014     """
2015     if dev_path is None:
2016       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2017       return
2018     fpath = cls._ConvertPath(dev_path)
2019     try:
2020       utils.RemoveFile(fpath)
2021     except EnvironmentError, err:
2022       logging.exception("Can't update bdev cache for %s", dev_path)