Convert ImportOSIntoInstance to OS API 10
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner(cluster_name):
52   return ssh.SshRunner(cluster_name)
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.OwnIpAddress(master_ip):
121       # we already have the ip:
122       logging.debug("Already started")
123     else:
124       logging.error("Someone else has the master ip, not activating")
125       ok = False
126   else:
127     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128                            "dev", master_netdev, "label",
129                            "%s:0" % master_netdev])
130     if result.failed:
131       logging.error("Can't activate master IP: %s", result.output)
132       ok = False
133
134     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135                            "-s", master_ip, master_ip])
136     # we'll ignore the exit code of arping
137
138   # and now start the master and rapi daemons
139   if start_daemons:
140     for daemon in 'ganeti-masterd', 'ganeti-rapi':
141       result = utils.RunCmd([daemon])
142       if result.failed:
143         logging.error("Can't start daemon %s: %s", daemon, result.output)
144         ok = False
145   return ok
146
147
148 def StopMaster(stop_daemons):
149   """Deactivate this node as master.
150
151   The function will always try to deactivate the IP address of the
152   master. Then, if the stop_daemons parameter is True, it will also
153   stop the master daemons (ganet-masterd and ganeti-rapi).
154
155   """
156   master_netdev, master_ip, _ = GetMasterInfo()
157   if not master_netdev:
158     return False
159
160   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161                          "dev", master_netdev])
162   if result.failed:
163     logging.error("Can't remove the master IP, error: %s", result.output)
164     # but otherwise ignore the failure
165
166   if stop_daemons:
167     # stop/kill the rapi and the master daemon
168     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170
171   return True
172
173
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175   """Joins this node to the cluster.
176
177   This does the following:
178       - updates the hostkeys of the machine (rsa and dsa)
179       - adds the ssh private key to the user
180       - adds the ssh public key to the users' authorized_keys file
181
182   """
183   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187   for name, content, mode in sshd_keys:
188     utils.WriteFile(name, data=content, mode=mode)
189
190   try:
191     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192                                                     mkdir=True)
193   except errors.OpExecError, err:
194     logging.exception("Error while processing user ssh files")
195     return False
196
197   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198     utils.WriteFile(name, data=content, mode=0600)
199
200   utils.AddAuthorizedKey(auth_keys, sshpub)
201
202   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203
204   return True
205
206
207 def LeaveCluster():
208   """Cleans up the current node and prepares it to be removed from the cluster.
209
210   """
211   _CleanDirectory(constants.DATA_DIR)
212   JobQueuePurge()
213
214   try:
215     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216   except errors.OpExecError:
217     logging.exception("Error while processing ssh files")
218     return
219
220   f = open(pub_key, 'r')
221   try:
222     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223   finally:
224     f.close()
225
226   utils.RemoveFile(priv_key)
227   utils.RemoveFile(pub_key)
228
229   # Return a reassuring string to the caller, and quit
230   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231
232
233 def GetNodeInfo(vgname, hypervisor_type):
234   """Gives back a hash with different informations about the node.
235
236   @type vgname: C{string}
237   @param vgname: the name of the volume group to ask for disk space information
238   @type hypervisor_type: C{str}
239   @param hypervisor_type: the name of the hypervisor to ask for
240       memory information
241   @rtype: C{dict}
242   @return: dictionary with the following keys:
243       - vg_size is the size of the configured volume group in MiB
244       - vg_free is the free size of the volume group in MiB
245       - memory_dom0 is the memory allocated for domain0 in MiB
246       - memory_free is the currently available (free) ram in MiB
247       - memory_total is the total number of ram in MiB
248
249   """
250   outputarray = {}
251   vginfo = _GetVGInfo(vgname)
252   outputarray['vg_size'] = vginfo['vg_size']
253   outputarray['vg_free'] = vginfo['vg_free']
254
255   hyper = hypervisor.GetHypervisor(hypervisor_type)
256   hyp_info = hyper.GetNodeInfo()
257   if hyp_info is not None:
258     outputarray.update(hyp_info)
259
260   f = open("/proc/sys/kernel/random/boot_id", 'r')
261   try:
262     outputarray["bootid"] = f.read(128).rstrip("\n")
263   finally:
264     f.close()
265
266   return outputarray
267
268
269 def VerifyNode(what, cluster_name):
270   """Verify the status of the local node.
271
272   Based on the input L{what} parameter, various checks are done on the
273   local node.
274
275   If the I{filelist} key is present, this list of
276   files is checksummed and the file/checksum pairs are returned.
277
278   If the I{nodelist} key is present, we check that we have
279   connectivity via ssh with the target nodes (and check the hostname
280   report).
281
282   If the I{node-net-test} key is present, we check that we have
283   connectivity to the given nodes via both primary IP and, if
284   applicable, secondary IPs.
285
286   @type what: C{dict}
287   @param what: a dictionary of things to check:
288       - filelist: list of files for which to compute checksums
289       - nodelist: list of nodes we should check ssh communication with
290       - node-net-test: list of nodes we should check node daemon port
291         connectivity with
292       - hypervisor: list with hypervisors to run the verify for
293
294   """
295   result = {}
296
297   if 'hypervisor' in what:
298     result['hypervisor'] = my_dict = {}
299     for hv_name in what['hypervisor']:
300       my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301
302   if 'filelist' in what:
303     result['filelist'] = utils.FingerprintFiles(what['filelist'])
304
305   if 'nodelist' in what:
306     result['nodelist'] = {}
307     random.shuffle(what['nodelist'])
308     for node in what['nodelist']:
309       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310       if not success:
311         result['nodelist'][node] = message
312   if 'node-net-test' in what:
313     result['node-net-test'] = {}
314     my_name = utils.HostInfo().name
315     my_pip = my_sip = None
316     for name, pip, sip in what['node-net-test']:
317       if name == my_name:
318         my_pip = pip
319         my_sip = sip
320         break
321     if not my_pip:
322       result['node-net-test'][my_name] = ("Can't find my own"
323                                           " primary/secondary IP"
324                                           " in the node list")
325     else:
326       port = utils.GetNodeDaemonPort()
327       for name, pip, sip in what['node-net-test']:
328         fail = []
329         if not utils.TcpPing(pip, port, source=my_pip):
330           fail.append("primary")
331         if sip != pip:
332           if not utils.TcpPing(sip, port, source=my_sip):
333             fail.append("secondary")
334         if fail:
335           result['node-net-test'][name] = ("failure using the %s"
336                                            " interface(s)" %
337                                            " and ".join(fail))
338
339   return result
340
341
342 def GetVolumeList(vg_name):
343   """Compute list of logical volumes and their size.
344
345   Returns:
346     dictionary of all partions (key) with their size (in MiB), inactive
347     and online status:
348     {'test1': ('20.06', True, True)}
349
350   """
351   lvs = {}
352   sep = '|'
353   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354                          "--separator=%s" % sep,
355                          "-olv_name,lv_size,lv_attr", vg_name])
356   if result.failed:
357     logging.error("Failed to list logical volumes, lvs output: %s",
358                   result.output)
359     return result.output
360
361   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362   for line in result.stdout.splitlines():
363     line = line.strip()
364     match = valid_line_re.match(line)
365     if not match:
366       logging.error("Invalid line returned from lvs output: '%s'", line)
367       continue
368     name, size, attr = match.groups()
369     inactive = attr[4] == '-'
370     online = attr[5] == 'o'
371     lvs[name] = (size, inactive, online)
372
373   return lvs
374
375
376 def ListVolumeGroups():
377   """List the volume groups and their size.
378
379   Returns:
380     Dictionary with keys volume name and values the size of the volume
381
382   """
383   return utils.ListVolumeGroups()
384
385
386 def NodeVolumes():
387   """List all volumes on this node.
388
389   """
390   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391                          "--separator=|",
392                          "--options=lv_name,lv_size,devices,vg_name"])
393   if result.failed:
394     logging.error("Failed to list logical volumes, lvs output: %s",
395                   result.output)
396     return {}
397
398   def parse_dev(dev):
399     if '(' in dev:
400       return dev.split('(')[0]
401     else:
402       return dev
403
404   def map_line(line):
405     return {
406       'name': line[0].strip(),
407       'size': line[1].strip(),
408       'dev': parse_dev(line[2].strip()),
409       'vg': line[3].strip(),
410     }
411
412   return [map_line(line.split('|')) for line in result.stdout.splitlines()
413           if line.count('|') >= 3]
414
415
416 def BridgesExist(bridges_list):
417   """Check if a list of bridges exist on the current node.
418
419   Returns:
420     True if all of them exist, false otherwise
421
422   """
423   for bridge in bridges_list:
424     if not utils.BridgeExists(bridge):
425       return False
426
427   return True
428
429
430 def GetInstanceList(hypervisor_list):
431   """Provides a list of instances.
432
433   @type hypervisor_list: list
434   @param hypervisor_list: the list of hypervisors to query information
435
436   @rtype: list
437   @return: a list of all running instances on the current node
438              - instance1.example.com
439              - instance2.example.com
440
441   """
442   results = []
443   for hname in hypervisor_list:
444     try:
445       names = hypervisor.GetHypervisor(hname).ListInstances()
446       results.extend(names)
447     except errors.HypervisorError, err:
448       logging.exception("Error enumerating instances for hypevisor %s", hname)
449       # FIXME: should we somehow not propagate this to the master?
450       raise
451
452   return results
453
454
455 def GetInstanceInfo(instance, hname):
456   """Gives back the informations about an instance as a dictionary.
457
458   @type instance: string
459   @param instance: the instance name
460   @type hname: string
461   @param hname: the hypervisor type of the instance
462
463   @rtype: dict
464   @return: dictionary with the following keys:
465       - memory: memory size of instance (int)
466       - state: xen state of instance (string)
467       - time: cpu time of instance (float)
468
469   """
470   output = {}
471
472   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473   if iinfo is not None:
474     output['memory'] = iinfo[2]
475     output['state'] = iinfo[4]
476     output['time'] = iinfo[5]
477
478   return output
479
480
481 def GetAllInstancesInfo(hypervisor_list):
482   """Gather data about all instances.
483
484   This is the equivalent of `GetInstanceInfo()`, except that it
485   computes data for all instances at once, thus being faster if one
486   needs data about more than one instance.
487
488   @type hypervisor_list: list
489   @param hypervisor_list: list of hypervisors to query for instance data
490
491   @rtype: dict of dicts
492   @return: dictionary of instance: data, with data having the following keys:
493       - memory: memory size of instance (int)
494       - state: xen state of instance (string)
495       - time: cpu time of instance (float)
496       - vcpuus: the number of vcpus
497
498   """
499   output = {}
500
501   for hname in hypervisor_list:
502     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503     if iinfo:
504       for name, inst_id, memory, vcpus, state, times in iinfo:
505         value = {
506           'memory': memory,
507           'vcpus': vcpus,
508           'state': state,
509           'time': times,
510           }
511         if name in output and output[name] != value:
512           raise errors.HypervisorError("Instance %s running duplicate"
513                                        " with different parameters" % name)
514         output[name] = value
515
516   return output
517
518
519 def AddOSToInstance(instance):
520   """Add an OS to an instance.
521
522   @type instance: L{objects.Instance}
523   @param instance: Instance whose OS is to be installed
524
525   """
526   inst_os = OSFromDisk(instance.os)
527
528   create_script = inst_os.create_script
529   create_env = OSEnvironment(instance)
530
531   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532                                      instance.name, int(time.time()))
533   if not os.path.exists(constants.LOG_OS_DIR):
534     os.mkdir(constants.LOG_OS_DIR, 0750)
535
536   command = utils.BuildShellCmd("cd %s && %s &>%s",
537                                 inst_os.path, create_script, logfile)
538
539   result = utils.RunCmd(command, env=create_env)
540   if result.failed:
541     logging.error("os create command '%s' returned error: %s, logfile: %s,"
542                   " output: %s", command, result.fail_reason, logfile,
543                   result.output)
544     return False
545
546   return True
547
548
549 def RunRenameInstance(instance, old_name):
550   """Run the OS rename script for an instance.
551
552   @type instance: objects.Instance
553   @param instance: Instance whose OS is to be installed
554   @type old_name: string
555   @param old_name: previous instance name
556
557   """
558   inst_os = OSFromDisk(instance.os)
559
560   script = inst_os.rename_script
561   rename_env = OSEnvironment(instance)
562   rename_env['OLD_INSTANCE_NAME'] = old_name
563
564   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
565                                            old_name,
566                                            instance.name, int(time.time()))
567   if not os.path.exists(constants.LOG_OS_DIR):
568     os.mkdir(constants.LOG_OS_DIR, 0750)
569
570   command = utils.BuildShellCmd("cd %s && %s &>%s",
571                                 inst_os.path, script, logfile)
572
573   result = utils.RunCmd(command, env=rename_env)
574
575   if result.failed:
576     logging.error("os create command '%s' returned error: %s output: %s",
577                   command, result.fail_reason, result.output)
578     return False
579
580   return True
581
582
583 def _GetVGInfo(vg_name):
584   """Get informations about the volume group.
585
586   Args:
587     vg_name: the volume group
588
589   Returns:
590     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
591     where
592     vg_size is the total size of the volume group in MiB
593     vg_free is the free size of the volume group in MiB
594     pv_count are the number of physical disks in that vg
595
596   If an error occurs during gathering of data, we return the same dict
597   with keys all set to None.
598
599   """
600   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
601
602   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603                          "--nosuffix", "--units=m", "--separator=:", vg_name])
604
605   if retval.failed:
606     logging.error("volume group %s not present", vg_name)
607     return retdic
608   valarr = retval.stdout.strip().rstrip(':').split(':')
609   if len(valarr) == 3:
610     try:
611       retdic = {
612         "vg_size": int(round(float(valarr[0]), 0)),
613         "vg_free": int(round(float(valarr[1]), 0)),
614         "pv_count": int(valarr[2]),
615         }
616     except ValueError, err:
617       logging.exception("Fail to parse vgs output")
618   else:
619     logging.error("vgs output has the wrong number of fields (expected"
620                   " three): %s", str(valarr))
621   return retdic
622
623
624 def _GatherBlockDevs(instance):
625   """Set up an instance's block device(s).
626
627   This is run on the primary node at instance startup. The block
628   devices must be already assembled.
629
630   """
631   block_devices = []
632   for disk in instance.disks:
633     device = _RecursiveFindBD(disk)
634     if device is None:
635       raise errors.BlockDeviceError("Block device '%s' is not set up." %
636                                     str(disk))
637     device.Open()
638     block_devices.append((disk, device))
639   return block_devices
640
641
642 def StartInstance(instance, extra_args):
643   """Start an instance.
644
645   @type instance: instance object
646   @param instance: the instance object
647   @rtype: boolean
648   @return: whether the startup was successful or not
649
650   """
651   running_instances = GetInstanceList([instance.hypervisor])
652
653   if instance.name in running_instances:
654     return True
655
656   block_devices = _GatherBlockDevs(instance)
657   hyper = hypervisor.GetHypervisor(instance.hypervisor)
658
659   try:
660     hyper.StartInstance(instance, block_devices, extra_args)
661   except errors.HypervisorError, err:
662     logging.exception("Failed to start instance")
663     return False
664
665   return True
666
667
668 def ShutdownInstance(instance):
669   """Shut an instance down.
670
671   @type instance: instance object
672   @param instance: the instance object
673   @rtype: boolean
674   @return: whether the startup was successful or not
675
676   """
677   hv_name = instance.hypervisor
678   running_instances = GetInstanceList([hv_name])
679
680   if instance.name not in running_instances:
681     return True
682
683   hyper = hypervisor.GetHypervisor(hv_name)
684   try:
685     hyper.StopInstance(instance)
686   except errors.HypervisorError, err:
687     logging.error("Failed to stop instance")
688     return False
689
690   # test every 10secs for 2min
691   shutdown_ok = False
692
693   time.sleep(1)
694   for dummy in range(11):
695     if instance.name not in GetInstanceList([hv_name]):
696       break
697     time.sleep(10)
698   else:
699     # the shutdown did not succeed
700     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
701
702     try:
703       hyper.StopInstance(instance, force=True)
704     except errors.HypervisorError, err:
705       logging.exception("Failed to stop instance")
706       return False
707
708     time.sleep(1)
709     if instance.name in GetInstanceList([hv_name]):
710       logging.error("could not shutdown instance '%s' even by destroy",
711                     instance.name)
712       return False
713
714   return True
715
716
717 def RebootInstance(instance, reboot_type, extra_args):
718   """Reboot an instance.
719
720   Args:
721     instance    - name of instance to reboot
722     reboot_type - how to reboot [soft,hard,full]
723
724   """
725   running_instances = GetInstanceList([instance.hypervisor])
726
727   if instance.name not in running_instances:
728     logging.error("Cannot reboot instance that is not running")
729     return False
730
731   hyper = hypervisor.GetHypervisor(instance.hypervisor)
732   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
733     try:
734       hyper.RebootInstance(instance)
735     except errors.HypervisorError, err:
736       logging.exception("Failed to soft reboot instance")
737       return False
738   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
739     try:
740       ShutdownInstance(instance)
741       StartInstance(instance, extra_args)
742     except errors.HypervisorError, err:
743       logging.exception("Failed to hard reboot instance")
744       return False
745   else:
746     raise errors.ParameterError("reboot_type invalid")
747
748   return True
749
750
751 def MigrateInstance(instance, target, live):
752   """Migrates an instance to another node.
753
754   @type instance: C{objects.Instance}
755   @param instance: the instance definition
756   @type target: string
757   @param target: the target node name
758   @type live: boolean
759   @param live: whether the migration should be done live or not (the
760       interpretation of this parameter is left to the hypervisor)
761   @rtype: tuple
762   @return: a tuple of (success, msg) where:
763       - succes is a boolean denoting the success/failure of the operation
764       - msg is a string with details in case of failure
765
766   """
767   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
768
769   try:
770     hyper.MigrateInstance(instance.name, target, live)
771   except errors.HypervisorError, err:
772     msg = "Failed to migrate instance: %s" % str(err)
773     logging.error(msg)
774     return (False, msg)
775   return (True, "Migration successfull")
776
777
778 def CreateBlockDevice(disk, size, owner, on_primary, info):
779   """Creates a block device for an instance.
780
781   Args:
782    disk: a ganeti.objects.Disk object
783    size: the size of the physical underlying device
784    owner: a string with the name of the instance
785    on_primary: a boolean indicating if it is the primary node or not
786    info: string that will be sent to the physical device creation
787
788   Returns:
789     the new unique_id of the device (this can sometime be
790     computed only after creation), or None. On secondary nodes,
791     it's not required to return anything.
792
793   """
794   clist = []
795   if disk.children:
796     for child in disk.children:
797       crdev = _RecursiveAssembleBD(child, owner, on_primary)
798       if on_primary or disk.AssembleOnSecondary():
799         # we need the children open in case the device itself has to
800         # be assembled
801         crdev.Open()
802       clist.append(crdev)
803   try:
804     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
805     if device is not None:
806       logging.info("removing existing device %s", disk)
807       device.Remove()
808   except errors.BlockDeviceError, err:
809     pass
810
811   device = bdev.Create(disk.dev_type, disk.physical_id,
812                        clist, size)
813   if device is None:
814     raise ValueError("Can't create child device for %s, %s" %
815                      (disk, size))
816   if on_primary or disk.AssembleOnSecondary():
817     if not device.Assemble():
818       errorstring = "Can't assemble device after creation"
819       logging.error(errorstring)
820       raise errors.BlockDeviceError("%s, very unusual event - check the node"
821                                     " daemon logs" % errorstring)
822     device.SetSyncSpeed(constants.SYNC_SPEED)
823     if on_primary or disk.OpenOnSecondary():
824       device.Open(force=True)
825     DevCacheManager.UpdateCache(device.dev_path, owner,
826                                 on_primary, disk.iv_name)
827
828   device.SetInfo(info)
829
830   physical_id = device.unique_id
831   return physical_id
832
833
834 def RemoveBlockDevice(disk):
835   """Remove a block device.
836
837   This is intended to be called recursively.
838
839   """
840   try:
841     # since we are removing the device, allow a partial match
842     # this allows removal of broken mirrors
843     rdev = _RecursiveFindBD(disk, allow_partial=True)
844   except errors.BlockDeviceError, err:
845     # probably can't attach
846     logging.info("Can't attach to device %s in remove", disk)
847     rdev = None
848   if rdev is not None:
849     r_path = rdev.dev_path
850     result = rdev.Remove()
851     if result:
852       DevCacheManager.RemoveCache(r_path)
853   else:
854     result = True
855   if disk.children:
856     for child in disk.children:
857       result = result and RemoveBlockDevice(child)
858   return result
859
860
861 def _RecursiveAssembleBD(disk, owner, as_primary):
862   """Activate a block device for an instance.
863
864   This is run on the primary and secondary nodes for an instance.
865
866   This function is called recursively.
867
868   Args:
869     disk: a objects.Disk object
870     as_primary: if we should make the block device read/write
871
872   Returns:
873     the assembled device or None (in case no device was assembled)
874
875   If the assembly is not successful, an exception is raised.
876
877   """
878   children = []
879   if disk.children:
880     mcn = disk.ChildrenNeeded()
881     if mcn == -1:
882       mcn = 0 # max number of Nones allowed
883     else:
884       mcn = len(disk.children) - mcn # max number of Nones
885     for chld_disk in disk.children:
886       try:
887         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
888       except errors.BlockDeviceError, err:
889         if children.count(None) >= mcn:
890           raise
891         cdev = None
892         logging.debug("Error in child activation: %s", str(err))
893       children.append(cdev)
894
895   if as_primary or disk.AssembleOnSecondary():
896     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
897     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
898     result = r_dev
899     if as_primary or disk.OpenOnSecondary():
900       r_dev.Open()
901     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
902                                 as_primary, disk.iv_name)
903
904   else:
905     result = True
906   return result
907
908
909 def AssembleBlockDevice(disk, owner, as_primary):
910   """Activate a block device for an instance.
911
912   This is a wrapper over _RecursiveAssembleBD.
913
914   Returns:
915     a /dev path for primary nodes
916     True for secondary nodes
917
918   """
919   result = _RecursiveAssembleBD(disk, owner, as_primary)
920   if isinstance(result, bdev.BlockDev):
921     result = result.dev_path
922   return result
923
924
925 def ShutdownBlockDevice(disk):
926   """Shut down a block device.
927
928   First, if the device is assembled (can `Attach()`), then the device
929   is shutdown. Then the children of the device are shutdown.
930
931   This function is called recursively. Note that we don't cache the
932   children or such, as oppossed to assemble, shutdown of different
933   devices doesn't require that the upper device was active.
934
935   """
936   r_dev = _RecursiveFindBD(disk)
937   if r_dev is not None:
938     r_path = r_dev.dev_path
939     result = r_dev.Shutdown()
940     if result:
941       DevCacheManager.RemoveCache(r_path)
942   else:
943     result = True
944   if disk.children:
945     for child in disk.children:
946       result = result and ShutdownBlockDevice(child)
947   return result
948
949
950 def MirrorAddChildren(parent_cdev, new_cdevs):
951   """Extend a mirrored block device.
952
953   """
954   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
955   if parent_bdev is None:
956     logging.error("Can't find parent device")
957     return False
958   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
959   if new_bdevs.count(None) > 0:
960     logging.error("Can't find new device(s) to add: %s:%s",
961                   new_bdevs, new_cdevs)
962     return False
963   parent_bdev.AddChildren(new_bdevs)
964   return True
965
966
967 def MirrorRemoveChildren(parent_cdev, new_cdevs):
968   """Shrink a mirrored block device.
969
970   """
971   parent_bdev = _RecursiveFindBD(parent_cdev)
972   if parent_bdev is None:
973     logging.error("Can't find parent in remove children: %s", parent_cdev)
974     return False
975   devs = []
976   for disk in new_cdevs:
977     rpath = disk.StaticDevPath()
978     if rpath is None:
979       bd = _RecursiveFindBD(disk)
980       if bd is None:
981         logging.error("Can't find dynamic device %s while removing children",
982                       disk)
983         return False
984       else:
985         devs.append(bd.dev_path)
986     else:
987       devs.append(rpath)
988   parent_bdev.RemoveChildren(devs)
989   return True
990
991
992 def GetMirrorStatus(disks):
993   """Get the mirroring status of a list of devices.
994
995   Args:
996     disks: list of `objects.Disk`
997
998   Returns:
999     list of (mirror_done, estimated_time) tuples, which
1000     are the result of bdev.BlockDevice.CombinedSyncStatus()
1001
1002   """
1003   stats = []
1004   for dsk in disks:
1005     rbd = _RecursiveFindBD(dsk)
1006     if rbd is None:
1007       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1008     stats.append(rbd.CombinedSyncStatus())
1009   return stats
1010
1011
1012 def _RecursiveFindBD(disk, allow_partial=False):
1013   """Check if a device is activated.
1014
1015   If so, return informations about the real device.
1016
1017   Args:
1018     disk: the objects.Disk instance
1019     allow_partial: don't abort the find if a child of the
1020                    device can't be found; this is intended to be
1021                    used when repairing mirrors
1022
1023   Returns:
1024     None if the device can't be found
1025     otherwise the device instance
1026
1027   """
1028   children = []
1029   if disk.children:
1030     for chdisk in disk.children:
1031       children.append(_RecursiveFindBD(chdisk))
1032
1033   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1034
1035
1036 def FindBlockDevice(disk):
1037   """Check if a device is activated.
1038
1039   If so, return informations about the real device.
1040
1041   Args:
1042     disk: the objects.Disk instance
1043   Returns:
1044     None if the device can't be found
1045     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1046
1047   """
1048   rbd = _RecursiveFindBD(disk)
1049   if rbd is None:
1050     return rbd
1051   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1052
1053
1054 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1055   """Write a file to the filesystem.
1056
1057   This allows the master to overwrite(!) a file. It will only perform
1058   the operation if the file belongs to a list of configuration files.
1059
1060   """
1061   if not os.path.isabs(file_name):
1062     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1063                   file_name)
1064     return False
1065
1066   allowed_files = [
1067     constants.CLUSTER_CONF_FILE,
1068     constants.ETC_HOSTS,
1069     constants.SSH_KNOWN_HOSTS_FILE,
1070     constants.VNC_PASSWORD_FILE,
1071     ]
1072
1073   if file_name not in allowed_files:
1074     logging.error("Filename passed to UploadFile not in allowed"
1075                  " upload targets: '%s'", file_name)
1076     return False
1077
1078   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1079                   atime=atime, mtime=mtime)
1080   return True
1081
1082
1083 def _ErrnoOrStr(err):
1084   """Format an EnvironmentError exception.
1085
1086   If the `err` argument has an errno attribute, it will be looked up
1087   and converted into a textual EXXXX description. Otherwise the string
1088   representation of the error will be returned.
1089
1090   """
1091   if hasattr(err, 'errno'):
1092     detail = errno.errorcode[err.errno]
1093   else:
1094     detail = str(err)
1095   return detail
1096
1097
1098 def _OSOndiskVersion(name, os_dir):
1099   """Compute and return the API version of a given OS.
1100
1101   This function will try to read the API version of the os given by
1102   the 'name' parameter and residing in the 'os_dir' directory.
1103
1104   Return value will be either an integer denoting the version or None in the
1105   case when this is not a valid OS name.
1106
1107   """
1108   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1109
1110   try:
1111     st = os.stat(api_file)
1112   except EnvironmentError, err:
1113     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1114                            " found (%s)" % _ErrnoOrStr(err))
1115
1116   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1117     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1118                            " a regular file")
1119
1120   try:
1121     f = open(api_file)
1122     try:
1123       api_versions = f.readlines()
1124     finally:
1125       f.close()
1126   except EnvironmentError, err:
1127     raise errors.InvalidOS(name, os_dir, "error while reading the"
1128                            " API version (%s)" % _ErrnoOrStr(err))
1129
1130   api_versions = [version.strip() for version in api_versions]
1131   try:
1132     api_versions = [int(version) for version in api_versions]
1133   except (TypeError, ValueError), err:
1134     raise errors.InvalidOS(name, os_dir,
1135                            "API version is not integer (%s)" % str(err))
1136
1137   return api_versions
1138
1139
1140 def DiagnoseOS(top_dirs=None):
1141   """Compute the validity for all OSes.
1142
1143   Returns an OS object for each name in all the given top directories
1144   (if not given defaults to constants.OS_SEARCH_PATH)
1145
1146   Returns:
1147     list of OS objects
1148
1149   """
1150   if top_dirs is None:
1151     top_dirs = constants.OS_SEARCH_PATH
1152
1153   result = []
1154   for dir_name in top_dirs:
1155     if os.path.isdir(dir_name):
1156       try:
1157         f_names = utils.ListVisibleFiles(dir_name)
1158       except EnvironmentError, err:
1159         logging.exception("Can't list the OS directory %s", dir_name)
1160         break
1161       for name in f_names:
1162         try:
1163           os_inst = OSFromDisk(name, base_dir=dir_name)
1164           result.append(os_inst)
1165         except errors.InvalidOS, err:
1166           result.append(objects.OS.FromInvalidOS(err))
1167
1168   return result
1169
1170
1171 def OSFromDisk(name, base_dir=None):
1172   """Create an OS instance from disk.
1173
1174   This function will return an OS instance if the given name is a
1175   valid OS name. Otherwise, it will raise an appropriate
1176   `errors.InvalidOS` exception, detailing why this is not a valid
1177   OS.
1178
1179   @type base_dir: string
1180   @keyword base_dir: Base directory containing OS installations.
1181                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1182
1183   """
1184
1185   if base_dir is None:
1186     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1187     if os_dir is None:
1188       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1189   else:
1190     os_dir = os.path.sep.join([base_dir, name])
1191
1192   api_versions = _OSOndiskVersion(name, os_dir)
1193
1194   if constants.OS_API_VERSION not in api_versions:
1195     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1196                            " (found %s want %s)"
1197                            % (api_versions, constants.OS_API_VERSION))
1198
1199   # OS Scripts dictionary, we will populate it with the actual script names
1200   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1201
1202   for script in os_scripts:
1203     os_scripts[script] = os.path.sep.join([os_dir, script])
1204
1205     try:
1206       st = os.stat(os_scripts[script])
1207     except EnvironmentError, err:
1208       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1209                              (script, _ErrnoOrStr(err)))
1210
1211     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1212       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1213                              script)
1214
1215     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1216       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1217                              script)
1218
1219
1220   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1221                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1222                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1223                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1224                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1225                     api_versions=api_versions)
1226
1227 def OSEnvironment(instance, debug=0):
1228   """Calculate the environment for an os script.
1229
1230   @type instance: instance object
1231   @param instance: target instance for the os script run
1232   @type debug: integer
1233   @param debug: debug level (0 or 1, for os api 10)
1234   @rtype: dict
1235   @return: dict of environment variables
1236
1237   """
1238   result = {}
1239   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1240   result['INSTANCE_NAME'] = instance.name
1241   result['HYPERVISOR'] = instance.hypervisor
1242   result['DISK_COUNT'] = '%d' % len(instance.disks)
1243   result['NIC_COUNT'] = '%d' % len(instance.nics)
1244   result['DEBUG_LEVEL'] = '%d' % debug
1245   for idx, disk in enumerate(instance.disks):
1246     real_disk = _RecursiveFindBD(disk)
1247     if real_disk is None:
1248       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1249                                     str(disk))
1250     real_disk.Open()
1251     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1252     # FIXME: When disks will have read-only mode, populate this
1253     result['DISK_%d_ACCESS' % idx] = 'W'
1254     if constants.HV_DISK_TYPE in instance.hvparams:
1255       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1256         instance.hvparams[constants.HV_DISK_TYPE]
1257     if disk.dev_type in constants.LDS_BLOCK:
1258       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1259     elif disk.dev_type == constants.LD_FILE:
1260       result['DISK_%d_BACKEND_TYPE' % idx] = \
1261         'file:%s' % disk.physical_id[0]
1262   for idx, nic in enumerate(instance.nics):
1263     result['NIC_%d_MAC' % idx] = nic.mac
1264     if nic.ip:
1265       result['NIC_%d_IP' % idx] = nic.ip
1266     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1267     if constants.HV_NIC_TYPE in instance.hvparams:
1268       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1269         instance.hvparams[constants.HV_NIC_TYPE]
1270
1271   return result
1272
1273 def GrowBlockDevice(disk, amount):
1274   """Grow a stack of block devices.
1275
1276   This function is called recursively, with the childrens being the
1277   first one resize.
1278
1279   Args:
1280     disk: the disk to be grown
1281
1282   Returns: a tuple of (status, result), with:
1283     status: the result (true/false) of the operation
1284     result: the error message if the operation failed, otherwise not used
1285
1286   """
1287   r_dev = _RecursiveFindBD(disk)
1288   if r_dev is None:
1289     return False, "Cannot find block device %s" % (disk,)
1290
1291   try:
1292     r_dev.Grow(amount)
1293   except errors.BlockDeviceError, err:
1294     return False, str(err)
1295
1296   return True, None
1297
1298
1299 def SnapshotBlockDevice(disk):
1300   """Create a snapshot copy of a block device.
1301
1302   This function is called recursively, and the snapshot is actually created
1303   just for the leaf lvm backend device.
1304
1305   @type disk: L{objects.Disk}
1306   @param disk: the disk to be snapshotted
1307   @rtype: string
1308   @return: snapshot disk path
1309
1310   """
1311   if disk.children:
1312     if len(disk.children) == 1:
1313       # only one child, let's recurse on it
1314       return SnapshotBlockDevice(disk.children[0])
1315     else:
1316       # more than one child, choose one that matches
1317       for child in disk.children:
1318         if child.size == disk.size:
1319           # return implies breaking the loop
1320           return SnapshotBlockDevice(child)
1321   elif disk.dev_type == constants.LD_LV:
1322     r_dev = _RecursiveFindBD(disk)
1323     if r_dev is not None:
1324       # let's stay on the safe side and ask for the full size, for now
1325       return r_dev.Snapshot(disk.size)
1326     else:
1327       return None
1328   else:
1329     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1330                                  " '%s' of type '%s'" %
1331                                  (disk.unique_id, disk.dev_type))
1332
1333
1334 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1335   """Export a block device snapshot to a remote node.
1336
1337   Args:
1338     disk: the snapshot block device
1339     dest_node: the node to send the image to
1340     instance: instance being exported
1341
1342   Returns:
1343     True if successful, False otherwise.
1344
1345   """
1346   export_env = OSEnvironment(instance)
1347
1348   inst_os = OSFromDisk(instance.os)
1349   export_script = inst_os.export_script
1350
1351   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1352                                      instance.name, int(time.time()))
1353   if not os.path.exists(constants.LOG_OS_DIR):
1354     os.mkdir(constants.LOG_OS_DIR, 0750)
1355   real_disk = _RecursiveFindBD(disk)
1356   if real_disk is None:
1357     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1358                                   str(disk))
1359   real_disk.Open()
1360
1361   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1362
1363   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1364   destfile = disk.physical_id[1]
1365
1366   # the target command is built out of three individual commands,
1367   # which are joined by pipes; we check each individual command for
1368   # valid parameters
1369   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1370                                export_script, logfile)
1371
1372   comprcmd = "gzip"
1373
1374   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1375                                 destdir, destdir, destfile)
1376   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1377                                                    constants.GANETI_RUNAS,
1378                                                    destcmd)
1379
1380   # all commands have been checked, so we're safe to combine them
1381   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1382
1383   result = utils.RunCmd(command, env=export_env)
1384
1385   if result.failed:
1386     logging.error("os snapshot export command '%s' returned error: %s"
1387                   " output: %s", command, result.fail_reason, result.output)
1388     return False
1389
1390   return True
1391
1392
1393 def FinalizeExport(instance, snap_disks):
1394   """Write out the export configuration information.
1395
1396   Args:
1397     instance: instance configuration
1398     snap_disks: snapshot block devices
1399
1400   Returns:
1401     False in case of error, True otherwise.
1402
1403   """
1404   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1405   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1406
1407   config = objects.SerializableConfigParser()
1408
1409   config.add_section(constants.INISECT_EXP)
1410   config.set(constants.INISECT_EXP, 'version', '0')
1411   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1412   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1413   config.set(constants.INISECT_EXP, 'os', instance.os)
1414   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1415
1416   config.add_section(constants.INISECT_INS)
1417   config.set(constants.INISECT_INS, 'name', instance.name)
1418   config.set(constants.INISECT_INS, 'memory', '%d' %
1419              instance.beparams[constants.BE_MEMORY])
1420   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1421              instance.beparams[constants.BE_VCPUS])
1422   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1423
1424   nic_count = 0
1425   for nic_count, nic in enumerate(instance.nics):
1426     config.set(constants.INISECT_INS, 'nic%d_mac' %
1427                nic_count, '%s' % nic.mac)
1428     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1429     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1430                '%s' % nic.bridge)
1431   # TODO: redundant: on load can read nics until it doesn't exist
1432   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1433
1434   disk_count = 0
1435   for disk_count, disk in enumerate(snap_disks):
1436     if disk:
1437       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1438                  ('%s' % disk.iv_name))
1439       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1440                  ('%s' % disk.physical_id[1]))
1441       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1442                  ('%d' % disk.size))
1443   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1444
1445   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1446   cfo = open(cff, 'w')
1447   try:
1448     config.write(cfo)
1449   finally:
1450     cfo.close()
1451
1452   shutil.rmtree(finaldestdir, True)
1453   shutil.move(destdir, finaldestdir)
1454
1455   return True
1456
1457
1458 def ExportInfo(dest):
1459   """Get export configuration information.
1460
1461   Args:
1462     dest: directory containing the export
1463
1464   Returns:
1465     A serializable config file containing the export info.
1466
1467   """
1468   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1469
1470   config = objects.SerializableConfigParser()
1471   config.read(cff)
1472
1473   if (not config.has_section(constants.INISECT_EXP) or
1474       not config.has_section(constants.INISECT_INS)):
1475     return None
1476
1477   return config
1478
1479
1480 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1481   """Import an os image into an instance.
1482
1483   @type instance: L{objects.instance}
1484   @param instance: instance to import the disks into
1485   @type src_node: string
1486   @param src_node: source node for the disk images
1487   @type src_images: list of string
1488   @param src_images: absolute paths of the disk images
1489   @rtype: list of boolean
1490   @return: each boolean represent the success of importing the n-th disk
1491
1492   """
1493   import_env = OSEnvironment(instance)
1494   inst_os = OSFromDisk(instance.os)
1495   import_script = inst_os.import_script
1496
1497   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1498                                         instance.name, int(time.time()))
1499   if not os.path.exists(constants.LOG_OS_DIR):
1500     os.mkdir(constants.LOG_OS_DIR, 0750)
1501
1502   comprcmd = "gunzip"
1503   impcmd = utils.BuildShellCmd("(cd %s; %s &>%s)", inst_os.path, import_script,
1504                                logfile)
1505
1506   final_result = []
1507   for idx, image in enumerate(src_images):
1508     if image:
1509       destcmd = utils.BuildShellCmd('cat %s', image)
1510       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1511                                                        constants.GANETI_RUNAS,
1512                                                        destcmd)
1513       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1514       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1515       result = utils.RunCmd(command, env=import_env)
1516       if result.failed:
1517         logging.error("disk import command '%s' returned error: %s"
1518                       " output: %s", command, result.fail_reason, result.output)
1519         final_result.append(False)
1520       else:
1521         final_result.append(True)
1522     else:
1523       final_result.append(True)
1524
1525   return final_result
1526
1527
1528 def ListExports():
1529   """Return a list of exports currently available on this machine.
1530
1531   """
1532   if os.path.isdir(constants.EXPORT_DIR):
1533     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1534   else:
1535     return []
1536
1537
1538 def RemoveExport(export):
1539   """Remove an existing export from the node.
1540
1541   Args:
1542     export: the name of the export to remove
1543
1544   Returns:
1545     False in case of error, True otherwise.
1546
1547   """
1548   target = os.path.join(constants.EXPORT_DIR, export)
1549
1550   shutil.rmtree(target)
1551   # TODO: catch some of the relevant exceptions and provide a pretty
1552   # error message if rmtree fails.
1553
1554   return True
1555
1556
1557 def RenameBlockDevices(devlist):
1558   """Rename a list of block devices.
1559
1560   The devlist argument is a list of tuples (disk, new_logical,
1561   new_physical). The return value will be a combined boolean result
1562   (True only if all renames succeeded).
1563
1564   """
1565   result = True
1566   for disk, unique_id in devlist:
1567     dev = _RecursiveFindBD(disk)
1568     if dev is None:
1569       result = False
1570       continue
1571     try:
1572       old_rpath = dev.dev_path
1573       dev.Rename(unique_id)
1574       new_rpath = dev.dev_path
1575       if old_rpath != new_rpath:
1576         DevCacheManager.RemoveCache(old_rpath)
1577         # FIXME: we should add the new cache information here, like:
1578         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1579         # but we don't have the owner here - maybe parse from existing
1580         # cache? for now, we only lose lvm data when we rename, which
1581         # is less critical than DRBD or MD
1582     except errors.BlockDeviceError, err:
1583       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1584       result = False
1585   return result
1586
1587
1588 def _TransformFileStorageDir(file_storage_dir):
1589   """Checks whether given file_storage_dir is valid.
1590
1591   Checks wheter the given file_storage_dir is within the cluster-wide
1592   default file_storage_dir stored in SimpleStore. Only paths under that
1593   directory are allowed.
1594
1595   Args:
1596     file_storage_dir: string with path
1597
1598   Returns:
1599     normalized file_storage_dir (string) if valid, None otherwise
1600
1601   """
1602   cfg = _GetConfig()
1603   file_storage_dir = os.path.normpath(file_storage_dir)
1604   base_file_storage_dir = cfg.GetFileStorageDir()
1605   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1606       base_file_storage_dir):
1607     logging.error("file storage directory '%s' is not under base file"
1608                   " storage directory '%s'",
1609                   file_storage_dir, base_file_storage_dir)
1610     return None
1611   return file_storage_dir
1612
1613
1614 def CreateFileStorageDir(file_storage_dir):
1615   """Create file storage directory.
1616
1617   Args:
1618     file_storage_dir: string containing the path
1619
1620   Returns:
1621     tuple with first element a boolean indicating wheter dir
1622     creation was successful or not
1623
1624   """
1625   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1626   result = True,
1627   if not file_storage_dir:
1628     result = False,
1629   else:
1630     if os.path.exists(file_storage_dir):
1631       if not os.path.isdir(file_storage_dir):
1632         logging.error("'%s' is not a directory", file_storage_dir)
1633         result = False,
1634     else:
1635       try:
1636         os.makedirs(file_storage_dir, 0750)
1637       except OSError, err:
1638         logging.error("Cannot create file storage directory '%s': %s",
1639                       file_storage_dir, err)
1640         result = False,
1641   return result
1642
1643
1644 def RemoveFileStorageDir(file_storage_dir):
1645   """Remove file storage directory.
1646
1647   Remove it only if it's empty. If not log an error and return.
1648
1649   Args:
1650     file_storage_dir: string containing the path
1651
1652   Returns:
1653     tuple with first element a boolean indicating wheter dir
1654     removal was successful or not
1655
1656   """
1657   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1658   result = True,
1659   if not file_storage_dir:
1660     result = False,
1661   else:
1662     if os.path.exists(file_storage_dir):
1663       if not os.path.isdir(file_storage_dir):
1664         logging.error("'%s' is not a directory", file_storage_dir)
1665         result = False,
1666       # deletes dir only if empty, otherwise we want to return False
1667       try:
1668         os.rmdir(file_storage_dir)
1669       except OSError, err:
1670         logging.exception("Cannot remove file storage directory '%s'",
1671                           file_storage_dir)
1672         result = False,
1673   return result
1674
1675
1676 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1677   """Rename the file storage directory.
1678
1679   Args:
1680     old_file_storage_dir: string containing the old path
1681     new_file_storage_dir: string containing the new path
1682
1683   Returns:
1684     tuple with first element a boolean indicating wheter dir
1685     rename was successful or not
1686
1687   """
1688   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1689   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1690   result = True,
1691   if not old_file_storage_dir or not new_file_storage_dir:
1692     result = False,
1693   else:
1694     if not os.path.exists(new_file_storage_dir):
1695       if os.path.isdir(old_file_storage_dir):
1696         try:
1697           os.rename(old_file_storage_dir, new_file_storage_dir)
1698         except OSError, err:
1699           logging.exception("Cannot rename '%s' to '%s'",
1700                             old_file_storage_dir, new_file_storage_dir)
1701           result =  False,
1702       else:
1703         logging.error("'%s' is not a directory", old_file_storage_dir)
1704         result = False,
1705     else:
1706       if os.path.exists(old_file_storage_dir):
1707         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1708                       old_file_storage_dir, new_file_storage_dir)
1709         result = False,
1710   return result
1711
1712
1713 def _IsJobQueueFile(file_name):
1714   """Checks whether the given filename is in the queue directory.
1715
1716   """
1717   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1718   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1719
1720   if not result:
1721     logging.error("'%s' is not a file in the queue directory",
1722                   file_name)
1723
1724   return result
1725
1726
1727 def JobQueueUpdate(file_name, content):
1728   """Updates a file in the queue directory.
1729
1730   """
1731   if not _IsJobQueueFile(file_name):
1732     return False
1733
1734   # Write and replace the file atomically
1735   utils.WriteFile(file_name, data=content)
1736
1737   return True
1738
1739
1740 def JobQueueRename(old, new):
1741   """Renames a job queue file.
1742
1743   """
1744   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1745     return False
1746
1747   os.rename(old, new)
1748
1749   return True
1750
1751
1752 def JobQueueSetDrainFlag(drain_flag):
1753   """Set the drain flag for the queue.
1754
1755   This will set or unset the queue drain flag.
1756
1757   @type drain_flag: bool
1758   @param drain_flag: if True, will set the drain flag, otherwise reset it.
1759
1760   """
1761   if drain_flag:
1762     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1763   else:
1764     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1765
1766   return True
1767
1768
1769 def CloseBlockDevices(disks):
1770   """Closes the given block devices.
1771
1772   This means they will be switched to secondary mode (in case of DRBD).
1773
1774   """
1775   bdevs = []
1776   for cf in disks:
1777     rd = _RecursiveFindBD(cf)
1778     if rd is None:
1779       return (False, "Can't find device %s" % cf)
1780     bdevs.append(rd)
1781
1782   msg = []
1783   for rd in bdevs:
1784     try:
1785       rd.Close()
1786     except errors.BlockDeviceError, err:
1787       msg.append(str(err))
1788   if msg:
1789     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1790   else:
1791     return (True, "All devices secondary")
1792
1793
1794 def ValidateHVParams(hvname, hvparams):
1795   """Validates the given hypervisor parameters.
1796
1797   @type hvname: string
1798   @param hvname: the hypervisor name
1799   @type hvparams: dict
1800   @param hvparams: the hypervisor parameters to be validated
1801   @rtype: tuple (bool, str)
1802   @return: tuple of (success, message)
1803
1804   """
1805   try:
1806     hv_type = hypervisor.GetHypervisor(hvname)
1807     hv_type.ValidateParameters(hvparams)
1808     return (True, "Validation passed")
1809   except errors.HypervisorError, err:
1810     return (False, str(err))
1811
1812
1813 class HooksRunner(object):
1814   """Hook runner.
1815
1816   This class is instantiated on the node side (ganeti-noded) and not on
1817   the master side.
1818
1819   """
1820   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1821
1822   def __init__(self, hooks_base_dir=None):
1823     """Constructor for hooks runner.
1824
1825     Args:
1826       - hooks_base_dir: if not None, this overrides the
1827         constants.HOOKS_BASE_DIR (useful for unittests)
1828
1829     """
1830     if hooks_base_dir is None:
1831       hooks_base_dir = constants.HOOKS_BASE_DIR
1832     self._BASE_DIR = hooks_base_dir
1833
1834   @staticmethod
1835   def ExecHook(script, env):
1836     """Exec one hook script.
1837
1838     Args:
1839      - script: the full path to the script
1840      - env: the environment with which to exec the script
1841
1842     """
1843     # exec the process using subprocess and log the output
1844     fdstdin = None
1845     try:
1846       fdstdin = open("/dev/null", "r")
1847       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1848                                stderr=subprocess.STDOUT, close_fds=True,
1849                                shell=False, cwd="/", env=env)
1850       output = ""
1851       try:
1852         output = child.stdout.read(4096)
1853         child.stdout.close()
1854       except EnvironmentError, err:
1855         output += "Hook script error: %s" % str(err)
1856
1857       while True:
1858         try:
1859           result = child.wait()
1860           break
1861         except EnvironmentError, err:
1862           if err.errno == errno.EINTR:
1863             continue
1864           raise
1865     finally:
1866       # try not to leak fds
1867       for fd in (fdstdin, ):
1868         if fd is not None:
1869           try:
1870             fd.close()
1871           except EnvironmentError, err:
1872             # just log the error
1873             #logging.exception("Error while closing fd %s", fd)
1874             pass
1875
1876     return result == 0, output
1877
1878   def RunHooks(self, hpath, phase, env):
1879     """Run the scripts in the hooks directory.
1880
1881     This method will not be usually overriden by child opcodes.
1882
1883     """
1884     if phase == constants.HOOKS_PHASE_PRE:
1885       suffix = "pre"
1886     elif phase == constants.HOOKS_PHASE_POST:
1887       suffix = "post"
1888     else:
1889       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1890     rr = []
1891
1892     subdir = "%s-%s.d" % (hpath, suffix)
1893     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1894     try:
1895       dir_contents = utils.ListVisibleFiles(dir_name)
1896     except OSError, err:
1897       # must log
1898       return rr
1899
1900     # we use the standard python sort order,
1901     # so 00name is the recommended naming scheme
1902     dir_contents.sort()
1903     for relname in dir_contents:
1904       fname = os.path.join(dir_name, relname)
1905       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1906           self.RE_MASK.match(relname) is not None):
1907         rrval = constants.HKR_SKIP
1908         output = ""
1909       else:
1910         result, output = self.ExecHook(fname, env)
1911         if not result:
1912           rrval = constants.HKR_FAIL
1913         else:
1914           rrval = constants.HKR_SUCCESS
1915       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1916
1917     return rr
1918
1919
1920 class IAllocatorRunner(object):
1921   """IAllocator runner.
1922
1923   This class is instantiated on the node side (ganeti-noded) and not on
1924   the master side.
1925
1926   """
1927   def Run(self, name, idata):
1928     """Run an iallocator script.
1929
1930     Return value: tuple of:
1931        - run status (one of the IARUN_ constants)
1932        - stdout
1933        - stderr
1934        - fail reason (as from utils.RunResult)
1935
1936     """
1937     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1938                                   os.path.isfile)
1939     if alloc_script is None:
1940       return (constants.IARUN_NOTFOUND, None, None, None)
1941
1942     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1943     try:
1944       os.write(fd, idata)
1945       os.close(fd)
1946       result = utils.RunCmd([alloc_script, fin_name])
1947       if result.failed:
1948         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1949                 result.fail_reason)
1950     finally:
1951       os.unlink(fin_name)
1952
1953     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1954
1955
1956 class DevCacheManager(object):
1957   """Simple class for managing a cache of block device information.
1958
1959   """
1960   _DEV_PREFIX = "/dev/"
1961   _ROOT_DIR = constants.BDEV_CACHE_DIR
1962
1963   @classmethod
1964   def _ConvertPath(cls, dev_path):
1965     """Converts a /dev/name path to the cache file name.
1966
1967     This replaces slashes with underscores and strips the /dev
1968     prefix. It then returns the full path to the cache file
1969
1970     """
1971     if dev_path.startswith(cls._DEV_PREFIX):
1972       dev_path = dev_path[len(cls._DEV_PREFIX):]
1973     dev_path = dev_path.replace("/", "_")
1974     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1975     return fpath
1976
1977   @classmethod
1978   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1979     """Updates the cache information for a given device.
1980
1981     """
1982     if dev_path is None:
1983       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1984       return
1985     fpath = cls._ConvertPath(dev_path)
1986     if on_primary:
1987       state = "primary"
1988     else:
1989       state = "secondary"
1990     if iv_name is None:
1991       iv_name = "not_visible"
1992     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1993     try:
1994       utils.WriteFile(fpath, data=fdata)
1995     except EnvironmentError, err:
1996       logging.exception("Can't update bdev cache for %s", dev_path)
1997
1998   @classmethod
1999   def RemoveCache(cls, dev_path):
2000     """Remove data for a dev_path.
2001
2002     """
2003     if dev_path is None:
2004       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2005       return
2006     fpath = cls._ConvertPath(dev_path)
2007     try:
2008       utils.RemoveFile(fpath)
2009     except EnvironmentError, err:
2010       logging.exception("Can't update bdev cache for %s", dev_path)