Convert SnapshotBlockDevice's docstring to epydoc
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetConfig():
48   return ssconf.SimpleConfigReader()
49
50
51 def _GetSshRunner(cluster_name):
52   return ssh.SshRunner(cluster_name)
53
54
55 def _CleanDirectory(path, exclude=[]):
56   """Removes all regular files in a directory.
57
58   @param exclude: List of files to be excluded.
59   @type exclude: list
60
61   """
62   if not os.path.isdir(path):
63     return
64
65   # Normalize excluded paths
66   exclude = [os.path.normpath(i) for i in exclude]
67
68   for rel_name in utils.ListVisibleFiles(path):
69     full_name = os.path.normpath(os.path.join(path, rel_name))
70     if full_name in exclude:
71       continue
72     if os.path.isfile(full_name) and not os.path.islink(full_name):
73       utils.RemoveFile(full_name)
74
75
76 def JobQueuePurge():
77   """Removes job queue files and archived jobs
78
79   """
80   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82
83
84 def GetMasterInfo():
85   """Returns master information.
86
87   This is an utility function to compute master information, either
88   for consumption here or from the node daemon.
89
90   @rtype: tuple
91   @return: (master_netdev, master_ip, master_name)
92
93   """
94   try:
95     cfg = _GetConfig()
96     master_netdev = cfg.GetMasterNetdev()
97     master_ip = cfg.GetMasterIP()
98     master_node = cfg.GetMasterNode()
99   except errors.ConfigurationError, err:
100     logging.exception("Cluster configuration incomplete")
101     return (None, None)
102   return (master_netdev, master_ip, master_node)
103
104
105 def StartMaster(start_daemons):
106   """Activate local node as master node.
107
108   The function will always try activate the IP address of the master
109   (if someone else has it, then it won't). Then, if the start_daemons
110   parameter is True, it will also start the master daemons
111   (ganet-masterd and ganeti-rapi).
112
113   """
114   ok = True
115   master_netdev, master_ip, _ = GetMasterInfo()
116   if not master_netdev:
117     return False
118
119   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120     if utils.OwnIpAddress(master_ip):
121       # we already have the ip:
122       logging.debug("Already started")
123     else:
124       logging.error("Someone else has the master ip, not activating")
125       ok = False
126   else:
127     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128                            "dev", master_netdev, "label",
129                            "%s:0" % master_netdev])
130     if result.failed:
131       logging.error("Can't activate master IP: %s", result.output)
132       ok = False
133
134     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135                            "-s", master_ip, master_ip])
136     # we'll ignore the exit code of arping
137
138   # and now start the master and rapi daemons
139   if start_daemons:
140     for daemon in 'ganeti-masterd', 'ganeti-rapi':
141       result = utils.RunCmd([daemon])
142       if result.failed:
143         logging.error("Can't start daemon %s: %s", daemon, result.output)
144         ok = False
145   return ok
146
147
148 def StopMaster(stop_daemons):
149   """Deactivate this node as master.
150
151   The function will always try to deactivate the IP address of the
152   master. Then, if the stop_daemons parameter is True, it will also
153   stop the master daemons (ganet-masterd and ganeti-rapi).
154
155   """
156   master_netdev, master_ip, _ = GetMasterInfo()
157   if not master_netdev:
158     return False
159
160   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161                          "dev", master_netdev])
162   if result.failed:
163     logging.error("Can't remove the master IP, error: %s", result.output)
164     # but otherwise ignore the failure
165
166   if stop_daemons:
167     # stop/kill the rapi and the master daemon
168     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170
171   return True
172
173
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175   """Joins this node to the cluster.
176
177   This does the following:
178       - updates the hostkeys of the machine (rsa and dsa)
179       - adds the ssh private key to the user
180       - adds the ssh public key to the users' authorized_keys file
181
182   """
183   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187   for name, content, mode in sshd_keys:
188     utils.WriteFile(name, data=content, mode=mode)
189
190   try:
191     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192                                                     mkdir=True)
193   except errors.OpExecError, err:
194     logging.exception("Error while processing user ssh files")
195     return False
196
197   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198     utils.WriteFile(name, data=content, mode=0600)
199
200   utils.AddAuthorizedKey(auth_keys, sshpub)
201
202   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203
204   return True
205
206
207 def LeaveCluster():
208   """Cleans up the current node and prepares it to be removed from the cluster.
209
210   """
211   _CleanDirectory(constants.DATA_DIR)
212   JobQueuePurge()
213
214   try:
215     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216   except errors.OpExecError:
217     logging.exception("Error while processing ssh files")
218     return
219
220   f = open(pub_key, 'r')
221   try:
222     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223   finally:
224     f.close()
225
226   utils.RemoveFile(priv_key)
227   utils.RemoveFile(pub_key)
228
229   # Return a reassuring string to the caller, and quit
230   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231
232
233 def GetNodeInfo(vgname, hypervisor_type):
234   """Gives back a hash with different informations about the node.
235
236   @type vgname: C{string}
237   @param vgname: the name of the volume group to ask for disk space information
238   @type hypervisor_type: C{str}
239   @param hypervisor_type: the name of the hypervisor to ask for
240       memory information
241   @rtype: C{dict}
242   @return: dictionary with the following keys:
243       - vg_size is the size of the configured volume group in MiB
244       - vg_free is the free size of the volume group in MiB
245       - memory_dom0 is the memory allocated for domain0 in MiB
246       - memory_free is the currently available (free) ram in MiB
247       - memory_total is the total number of ram in MiB
248
249   """
250   outputarray = {}
251   vginfo = _GetVGInfo(vgname)
252   outputarray['vg_size'] = vginfo['vg_size']
253   outputarray['vg_free'] = vginfo['vg_free']
254
255   hyper = hypervisor.GetHypervisor(hypervisor_type)
256   hyp_info = hyper.GetNodeInfo()
257   if hyp_info is not None:
258     outputarray.update(hyp_info)
259
260   f = open("/proc/sys/kernel/random/boot_id", 'r')
261   try:
262     outputarray["bootid"] = f.read(128).rstrip("\n")
263   finally:
264     f.close()
265
266   return outputarray
267
268
269 def VerifyNode(what, cluster_name):
270   """Verify the status of the local node.
271
272   Based on the input L{what} parameter, various checks are done on the
273   local node.
274
275   If the I{filelist} key is present, this list of
276   files is checksummed and the file/checksum pairs are returned.
277
278   If the I{nodelist} key is present, we check that we have
279   connectivity via ssh with the target nodes (and check the hostname
280   report).
281
282   If the I{node-net-test} key is present, we check that we have
283   connectivity to the given nodes via both primary IP and, if
284   applicable, secondary IPs.
285
286   @type what: C{dict}
287   @param what: a dictionary of things to check:
288       - filelist: list of files for which to compute checksums
289       - nodelist: list of nodes we should check ssh communication with
290       - node-net-test: list of nodes we should check node daemon port
291         connectivity with
292       - hypervisor: list with hypervisors to run the verify for
293
294   """
295   result = {}
296
297   if 'hypervisor' in what:
298     result['hypervisor'] = my_dict = {}
299     for hv_name in what['hypervisor']:
300       my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301
302   if 'filelist' in what:
303     result['filelist'] = utils.FingerprintFiles(what['filelist'])
304
305   if 'nodelist' in what:
306     result['nodelist'] = {}
307     random.shuffle(what['nodelist'])
308     for node in what['nodelist']:
309       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310       if not success:
311         result['nodelist'][node] = message
312   if 'node-net-test' in what:
313     result['node-net-test'] = {}
314     my_name = utils.HostInfo().name
315     my_pip = my_sip = None
316     for name, pip, sip in what['node-net-test']:
317       if name == my_name:
318         my_pip = pip
319         my_sip = sip
320         break
321     if not my_pip:
322       result['node-net-test'][my_name] = ("Can't find my own"
323                                           " primary/secondary IP"
324                                           " in the node list")
325     else:
326       port = utils.GetNodeDaemonPort()
327       for name, pip, sip in what['node-net-test']:
328         fail = []
329         if not utils.TcpPing(pip, port, source=my_pip):
330           fail.append("primary")
331         if sip != pip:
332           if not utils.TcpPing(sip, port, source=my_sip):
333             fail.append("secondary")
334         if fail:
335           result['node-net-test'][name] = ("failure using the %s"
336                                            " interface(s)" %
337                                            " and ".join(fail))
338
339   return result
340
341
342 def GetVolumeList(vg_name):
343   """Compute list of logical volumes and their size.
344
345   Returns:
346     dictionary of all partions (key) with their size (in MiB), inactive
347     and online status:
348     {'test1': ('20.06', True, True)}
349
350   """
351   lvs = {}
352   sep = '|'
353   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354                          "--separator=%s" % sep,
355                          "-olv_name,lv_size,lv_attr", vg_name])
356   if result.failed:
357     logging.error("Failed to list logical volumes, lvs output: %s",
358                   result.output)
359     return result.output
360
361   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362   for line in result.stdout.splitlines():
363     line = line.strip()
364     match = valid_line_re.match(line)
365     if not match:
366       logging.error("Invalid line returned from lvs output: '%s'", line)
367       continue
368     name, size, attr = match.groups()
369     inactive = attr[4] == '-'
370     online = attr[5] == 'o'
371     lvs[name] = (size, inactive, online)
372
373   return lvs
374
375
376 def ListVolumeGroups():
377   """List the volume groups and their size.
378
379   Returns:
380     Dictionary with keys volume name and values the size of the volume
381
382   """
383   return utils.ListVolumeGroups()
384
385
386 def NodeVolumes():
387   """List all volumes on this node.
388
389   """
390   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391                          "--separator=|",
392                          "--options=lv_name,lv_size,devices,vg_name"])
393   if result.failed:
394     logging.error("Failed to list logical volumes, lvs output: %s",
395                   result.output)
396     return {}
397
398   def parse_dev(dev):
399     if '(' in dev:
400       return dev.split('(')[0]
401     else:
402       return dev
403
404   def map_line(line):
405     return {
406       'name': line[0].strip(),
407       'size': line[1].strip(),
408       'dev': parse_dev(line[2].strip()),
409       'vg': line[3].strip(),
410     }
411
412   return [map_line(line.split('|')) for line in result.stdout.splitlines()
413           if line.count('|') >= 3]
414
415
416 def BridgesExist(bridges_list):
417   """Check if a list of bridges exist on the current node.
418
419   Returns:
420     True if all of them exist, false otherwise
421
422   """
423   for bridge in bridges_list:
424     if not utils.BridgeExists(bridge):
425       return False
426
427   return True
428
429
430 def GetInstanceList(hypervisor_list):
431   """Provides a list of instances.
432
433   @type hypervisor_list: list
434   @param hypervisor_list: the list of hypervisors to query information
435
436   @rtype: list
437   @return: a list of all running instances on the current node
438              - instance1.example.com
439              - instance2.example.com
440
441   """
442   results = []
443   for hname in hypervisor_list:
444     try:
445       names = hypervisor.GetHypervisor(hname).ListInstances()
446       results.extend(names)
447     except errors.HypervisorError, err:
448       logging.exception("Error enumerating instances for hypevisor %s", hname)
449       # FIXME: should we somehow not propagate this to the master?
450       raise
451
452   return results
453
454
455 def GetInstanceInfo(instance, hname):
456   """Gives back the informations about an instance as a dictionary.
457
458   @type instance: string
459   @param instance: the instance name
460   @type hname: string
461   @param hname: the hypervisor type of the instance
462
463   @rtype: dict
464   @return: dictionary with the following keys:
465       - memory: memory size of instance (int)
466       - state: xen state of instance (string)
467       - time: cpu time of instance (float)
468
469   """
470   output = {}
471
472   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473   if iinfo is not None:
474     output['memory'] = iinfo[2]
475     output['state'] = iinfo[4]
476     output['time'] = iinfo[5]
477
478   return output
479
480
481 def GetAllInstancesInfo(hypervisor_list):
482   """Gather data about all instances.
483
484   This is the equivalent of `GetInstanceInfo()`, except that it
485   computes data for all instances at once, thus being faster if one
486   needs data about more than one instance.
487
488   @type hypervisor_list: list
489   @param hypervisor_list: list of hypervisors to query for instance data
490
491   @rtype: dict of dicts
492   @return: dictionary of instance: data, with data having the following keys:
493       - memory: memory size of instance (int)
494       - state: xen state of instance (string)
495       - time: cpu time of instance (float)
496       - vcpuus: the number of vcpus
497
498   """
499   output = {}
500
501   for hname in hypervisor_list:
502     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503     if iinfo:
504       for name, inst_id, memory, vcpus, state, times in iinfo:
505         value = {
506           'memory': memory,
507           'vcpus': vcpus,
508           'state': state,
509           'time': times,
510           }
511         if name in output and output[name] != value:
512           raise errors.HypervisorError("Instance %s running duplicate"
513                                        " with different parameters" % name)
514         output[name] = value
515
516   return output
517
518
519 def AddOSToInstance(instance):
520   """Add an OS to an instance.
521
522   @type instance: L{objects.Instance}
523   @param instance: Instance whose OS is to be installed
524
525   """
526   inst_os = OSFromDisk(instance.os)
527
528   create_script = inst_os.create_script
529   create_env = OSEnvironment(instance)
530
531   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532                                      instance.name, int(time.time()))
533   if not os.path.exists(constants.LOG_OS_DIR):
534     os.mkdir(constants.LOG_OS_DIR, 0750)
535
536   command = utils.BuildShellCmd("cd %s && %s &>%s",
537                                 inst_os.path, create_script, logfile)
538
539   result = utils.RunCmd(command, env=create_env)
540   if result.failed:
541     logging.error("os create command '%s' returned error: %s, logfile: %s,"
542                   " output: %s", command, result.fail_reason, logfile,
543                   result.output)
544     return False
545
546   return True
547
548
549 def RunRenameInstance(instance, old_name):
550   """Run the OS rename script for an instance.
551
552   @type instance: objects.Instance
553   @param instance: Instance whose OS is to be installed
554   @type old_name: string
555   @param old_name: previous instance name
556
557   """
558   inst_os = OSFromDisk(instance.os)
559
560   script = inst_os.rename_script
561   rename_env = OSEnvironment(instance)
562   rename_env['OLD_INSTANCE_NAME'] = old_name
563
564   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
565                                            old_name,
566                                            instance.name, int(time.time()))
567   if not os.path.exists(constants.LOG_OS_DIR):
568     os.mkdir(constants.LOG_OS_DIR, 0750)
569
570   command = utils.BuildShellCmd("cd %s && %s &>%s",
571                                 inst_os.path, script, logfile)
572
573   result = utils.RunCmd(command, env=rename_env)
574
575   if result.failed:
576     logging.error("os create command '%s' returned error: %s output: %s",
577                   command, result.fail_reason, result.output)
578     return False
579
580   return True
581
582
583 def _GetVGInfo(vg_name):
584   """Get informations about the volume group.
585
586   Args:
587     vg_name: the volume group
588
589   Returns:
590     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
591     where
592     vg_size is the total size of the volume group in MiB
593     vg_free is the free size of the volume group in MiB
594     pv_count are the number of physical disks in that vg
595
596   If an error occurs during gathering of data, we return the same dict
597   with keys all set to None.
598
599   """
600   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
601
602   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603                          "--nosuffix", "--units=m", "--separator=:", vg_name])
604
605   if retval.failed:
606     logging.error("volume group %s not present", vg_name)
607     return retdic
608   valarr = retval.stdout.strip().rstrip(':').split(':')
609   if len(valarr) == 3:
610     try:
611       retdic = {
612         "vg_size": int(round(float(valarr[0]), 0)),
613         "vg_free": int(round(float(valarr[1]), 0)),
614         "pv_count": int(valarr[2]),
615         }
616     except ValueError, err:
617       logging.exception("Fail to parse vgs output")
618   else:
619     logging.error("vgs output has the wrong number of fields (expected"
620                   " three): %s", str(valarr))
621   return retdic
622
623
624 def _GatherBlockDevs(instance):
625   """Set up an instance's block device(s).
626
627   This is run on the primary node at instance startup. The block
628   devices must be already assembled.
629
630   """
631   block_devices = []
632   for disk in instance.disks:
633     device = _RecursiveFindBD(disk)
634     if device is None:
635       raise errors.BlockDeviceError("Block device '%s' is not set up." %
636                                     str(disk))
637     device.Open()
638     block_devices.append((disk, device))
639   return block_devices
640
641
642 def StartInstance(instance, extra_args):
643   """Start an instance.
644
645   @type instance: instance object
646   @param instance: the instance object
647   @rtype: boolean
648   @return: whether the startup was successful or not
649
650   """
651   running_instances = GetInstanceList([instance.hypervisor])
652
653   if instance.name in running_instances:
654     return True
655
656   block_devices = _GatherBlockDevs(instance)
657   hyper = hypervisor.GetHypervisor(instance.hypervisor)
658
659   try:
660     hyper.StartInstance(instance, block_devices, extra_args)
661   except errors.HypervisorError, err:
662     logging.exception("Failed to start instance")
663     return False
664
665   return True
666
667
668 def ShutdownInstance(instance):
669   """Shut an instance down.
670
671   @type instance: instance object
672   @param instance: the instance object
673   @rtype: boolean
674   @return: whether the startup was successful or not
675
676   """
677   hv_name = instance.hypervisor
678   running_instances = GetInstanceList([hv_name])
679
680   if instance.name not in running_instances:
681     return True
682
683   hyper = hypervisor.GetHypervisor(hv_name)
684   try:
685     hyper.StopInstance(instance)
686   except errors.HypervisorError, err:
687     logging.error("Failed to stop instance")
688     return False
689
690   # test every 10secs for 2min
691   shutdown_ok = False
692
693   time.sleep(1)
694   for dummy in range(11):
695     if instance.name not in GetInstanceList([hv_name]):
696       break
697     time.sleep(10)
698   else:
699     # the shutdown did not succeed
700     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
701
702     try:
703       hyper.StopInstance(instance, force=True)
704     except errors.HypervisorError, err:
705       logging.exception("Failed to stop instance")
706       return False
707
708     time.sleep(1)
709     if instance.name in GetInstanceList([hv_name]):
710       logging.error("could not shutdown instance '%s' even by destroy",
711                     instance.name)
712       return False
713
714   return True
715
716
717 def RebootInstance(instance, reboot_type, extra_args):
718   """Reboot an instance.
719
720   Args:
721     instance    - name of instance to reboot
722     reboot_type - how to reboot [soft,hard,full]
723
724   """
725   running_instances = GetInstanceList([instance.hypervisor])
726
727   if instance.name not in running_instances:
728     logging.error("Cannot reboot instance that is not running")
729     return False
730
731   hyper = hypervisor.GetHypervisor(instance.hypervisor)
732   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
733     try:
734       hyper.RebootInstance(instance)
735     except errors.HypervisorError, err:
736       logging.exception("Failed to soft reboot instance")
737       return False
738   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
739     try:
740       ShutdownInstance(instance)
741       StartInstance(instance, extra_args)
742     except errors.HypervisorError, err:
743       logging.exception("Failed to hard reboot instance")
744       return False
745   else:
746     raise errors.ParameterError("reboot_type invalid")
747
748   return True
749
750
751 def MigrateInstance(instance, target, live):
752   """Migrates an instance to another node.
753
754   @type instance: C{objects.Instance}
755   @param instance: the instance definition
756   @type target: string
757   @param target: the target node name
758   @type live: boolean
759   @param live: whether the migration should be done live or not (the
760       interpretation of this parameter is left to the hypervisor)
761   @rtype: tuple
762   @return: a tuple of (success, msg) where:
763       - succes is a boolean denoting the success/failure of the operation
764       - msg is a string with details in case of failure
765
766   """
767   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
768
769   try:
770     hyper.MigrateInstance(instance.name, target, live)
771   except errors.HypervisorError, err:
772     msg = "Failed to migrate instance: %s" % str(err)
773     logging.error(msg)
774     return (False, msg)
775   return (True, "Migration successfull")
776
777
778 def CreateBlockDevice(disk, size, owner, on_primary, info):
779   """Creates a block device for an instance.
780
781   Args:
782    disk: a ganeti.objects.Disk object
783    size: the size of the physical underlying device
784    owner: a string with the name of the instance
785    on_primary: a boolean indicating if it is the primary node or not
786    info: string that will be sent to the physical device creation
787
788   Returns:
789     the new unique_id of the device (this can sometime be
790     computed only after creation), or None. On secondary nodes,
791     it's not required to return anything.
792
793   """
794   clist = []
795   if disk.children:
796     for child in disk.children:
797       crdev = _RecursiveAssembleBD(child, owner, on_primary)
798       if on_primary or disk.AssembleOnSecondary():
799         # we need the children open in case the device itself has to
800         # be assembled
801         crdev.Open()
802       clist.append(crdev)
803   try:
804     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
805     if device is not None:
806       logging.info("removing existing device %s", disk)
807       device.Remove()
808   except errors.BlockDeviceError, err:
809     pass
810
811   device = bdev.Create(disk.dev_type, disk.physical_id,
812                        clist, size)
813   if device is None:
814     raise ValueError("Can't create child device for %s, %s" %
815                      (disk, size))
816   if on_primary or disk.AssembleOnSecondary():
817     if not device.Assemble():
818       errorstring = "Can't assemble device after creation"
819       logging.error(errorstring)
820       raise errors.BlockDeviceError("%s, very unusual event - check the node"
821                                     " daemon logs" % errorstring)
822     device.SetSyncSpeed(constants.SYNC_SPEED)
823     if on_primary or disk.OpenOnSecondary():
824       device.Open(force=True)
825     DevCacheManager.UpdateCache(device.dev_path, owner,
826                                 on_primary, disk.iv_name)
827
828   device.SetInfo(info)
829
830   physical_id = device.unique_id
831   return physical_id
832
833
834 def RemoveBlockDevice(disk):
835   """Remove a block device.
836
837   This is intended to be called recursively.
838
839   """
840   try:
841     # since we are removing the device, allow a partial match
842     # this allows removal of broken mirrors
843     rdev = _RecursiveFindBD(disk, allow_partial=True)
844   except errors.BlockDeviceError, err:
845     # probably can't attach
846     logging.info("Can't attach to device %s in remove", disk)
847     rdev = None
848   if rdev is not None:
849     r_path = rdev.dev_path
850     result = rdev.Remove()
851     if result:
852       DevCacheManager.RemoveCache(r_path)
853   else:
854     result = True
855   if disk.children:
856     for child in disk.children:
857       result = result and RemoveBlockDevice(child)
858   return result
859
860
861 def _RecursiveAssembleBD(disk, owner, as_primary):
862   """Activate a block device for an instance.
863
864   This is run on the primary and secondary nodes for an instance.
865
866   This function is called recursively.
867
868   Args:
869     disk: a objects.Disk object
870     as_primary: if we should make the block device read/write
871
872   Returns:
873     the assembled device or None (in case no device was assembled)
874
875   If the assembly is not successful, an exception is raised.
876
877   """
878   children = []
879   if disk.children:
880     mcn = disk.ChildrenNeeded()
881     if mcn == -1:
882       mcn = 0 # max number of Nones allowed
883     else:
884       mcn = len(disk.children) - mcn # max number of Nones
885     for chld_disk in disk.children:
886       try:
887         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
888       except errors.BlockDeviceError, err:
889         if children.count(None) >= mcn:
890           raise
891         cdev = None
892         logging.debug("Error in child activation: %s", str(err))
893       children.append(cdev)
894
895   if as_primary or disk.AssembleOnSecondary():
896     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
897     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
898     result = r_dev
899     if as_primary or disk.OpenOnSecondary():
900       r_dev.Open()
901     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
902                                 as_primary, disk.iv_name)
903
904   else:
905     result = True
906   return result
907
908
909 def AssembleBlockDevice(disk, owner, as_primary):
910   """Activate a block device for an instance.
911
912   This is a wrapper over _RecursiveAssembleBD.
913
914   Returns:
915     a /dev path for primary nodes
916     True for secondary nodes
917
918   """
919   result = _RecursiveAssembleBD(disk, owner, as_primary)
920   if isinstance(result, bdev.BlockDev):
921     result = result.dev_path
922   return result
923
924
925 def ShutdownBlockDevice(disk):
926   """Shut down a block device.
927
928   First, if the device is assembled (can `Attach()`), then the device
929   is shutdown. Then the children of the device are shutdown.
930
931   This function is called recursively. Note that we don't cache the
932   children or such, as oppossed to assemble, shutdown of different
933   devices doesn't require that the upper device was active.
934
935   """
936   r_dev = _RecursiveFindBD(disk)
937   if r_dev is not None:
938     r_path = r_dev.dev_path
939     result = r_dev.Shutdown()
940     if result:
941       DevCacheManager.RemoveCache(r_path)
942   else:
943     result = True
944   if disk.children:
945     for child in disk.children:
946       result = result and ShutdownBlockDevice(child)
947   return result
948
949
950 def MirrorAddChildren(parent_cdev, new_cdevs):
951   """Extend a mirrored block device.
952
953   """
954   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
955   if parent_bdev is None:
956     logging.error("Can't find parent device")
957     return False
958   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
959   if new_bdevs.count(None) > 0:
960     logging.error("Can't find new device(s) to add: %s:%s",
961                   new_bdevs, new_cdevs)
962     return False
963   parent_bdev.AddChildren(new_bdevs)
964   return True
965
966
967 def MirrorRemoveChildren(parent_cdev, new_cdevs):
968   """Shrink a mirrored block device.
969
970   """
971   parent_bdev = _RecursiveFindBD(parent_cdev)
972   if parent_bdev is None:
973     logging.error("Can't find parent in remove children: %s", parent_cdev)
974     return False
975   devs = []
976   for disk in new_cdevs:
977     rpath = disk.StaticDevPath()
978     if rpath is None:
979       bd = _RecursiveFindBD(disk)
980       if bd is None:
981         logging.error("Can't find dynamic device %s while removing children",
982                       disk)
983         return False
984       else:
985         devs.append(bd.dev_path)
986     else:
987       devs.append(rpath)
988   parent_bdev.RemoveChildren(devs)
989   return True
990
991
992 def GetMirrorStatus(disks):
993   """Get the mirroring status of a list of devices.
994
995   Args:
996     disks: list of `objects.Disk`
997
998   Returns:
999     list of (mirror_done, estimated_time) tuples, which
1000     are the result of bdev.BlockDevice.CombinedSyncStatus()
1001
1002   """
1003   stats = []
1004   for dsk in disks:
1005     rbd = _RecursiveFindBD(dsk)
1006     if rbd is None:
1007       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1008     stats.append(rbd.CombinedSyncStatus())
1009   return stats
1010
1011
1012 def _RecursiveFindBD(disk, allow_partial=False):
1013   """Check if a device is activated.
1014
1015   If so, return informations about the real device.
1016
1017   Args:
1018     disk: the objects.Disk instance
1019     allow_partial: don't abort the find if a child of the
1020                    device can't be found; this is intended to be
1021                    used when repairing mirrors
1022
1023   Returns:
1024     None if the device can't be found
1025     otherwise the device instance
1026
1027   """
1028   children = []
1029   if disk.children:
1030     for chdisk in disk.children:
1031       children.append(_RecursiveFindBD(chdisk))
1032
1033   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1034
1035
1036 def FindBlockDevice(disk):
1037   """Check if a device is activated.
1038
1039   If so, return informations about the real device.
1040
1041   Args:
1042     disk: the objects.Disk instance
1043   Returns:
1044     None if the device can't be found
1045     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1046
1047   """
1048   rbd = _RecursiveFindBD(disk)
1049   if rbd is None:
1050     return rbd
1051   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1052
1053
1054 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1055   """Write a file to the filesystem.
1056
1057   This allows the master to overwrite(!) a file. It will only perform
1058   the operation if the file belongs to a list of configuration files.
1059
1060   """
1061   if not os.path.isabs(file_name):
1062     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1063                   file_name)
1064     return False
1065
1066   allowed_files = [
1067     constants.CLUSTER_CONF_FILE,
1068     constants.ETC_HOSTS,
1069     constants.SSH_KNOWN_HOSTS_FILE,
1070     constants.VNC_PASSWORD_FILE,
1071     ]
1072
1073   if file_name not in allowed_files:
1074     logging.error("Filename passed to UploadFile not in allowed"
1075                  " upload targets: '%s'", file_name)
1076     return False
1077
1078   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1079                   atime=atime, mtime=mtime)
1080   return True
1081
1082
1083 def _ErrnoOrStr(err):
1084   """Format an EnvironmentError exception.
1085
1086   If the `err` argument has an errno attribute, it will be looked up
1087   and converted into a textual EXXXX description. Otherwise the string
1088   representation of the error will be returned.
1089
1090   """
1091   if hasattr(err, 'errno'):
1092     detail = errno.errorcode[err.errno]
1093   else:
1094     detail = str(err)
1095   return detail
1096
1097
1098 def _OSOndiskVersion(name, os_dir):
1099   """Compute and return the API version of a given OS.
1100
1101   This function will try to read the API version of the os given by
1102   the 'name' parameter and residing in the 'os_dir' directory.
1103
1104   Return value will be either an integer denoting the version or None in the
1105   case when this is not a valid OS name.
1106
1107   """
1108   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1109
1110   try:
1111     st = os.stat(api_file)
1112   except EnvironmentError, err:
1113     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1114                            " found (%s)" % _ErrnoOrStr(err))
1115
1116   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1117     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1118                            " a regular file")
1119
1120   try:
1121     f = open(api_file)
1122     try:
1123       api_versions = f.readlines()
1124     finally:
1125       f.close()
1126   except EnvironmentError, err:
1127     raise errors.InvalidOS(name, os_dir, "error while reading the"
1128                            " API version (%s)" % _ErrnoOrStr(err))
1129
1130   api_versions = [version.strip() for version in api_versions]
1131   try:
1132     api_versions = [int(version) for version in api_versions]
1133   except (TypeError, ValueError), err:
1134     raise errors.InvalidOS(name, os_dir,
1135                            "API version is not integer (%s)" % str(err))
1136
1137   return api_versions
1138
1139
1140 def DiagnoseOS(top_dirs=None):
1141   """Compute the validity for all OSes.
1142
1143   Returns an OS object for each name in all the given top directories
1144   (if not given defaults to constants.OS_SEARCH_PATH)
1145
1146   Returns:
1147     list of OS objects
1148
1149   """
1150   if top_dirs is None:
1151     top_dirs = constants.OS_SEARCH_PATH
1152
1153   result = []
1154   for dir_name in top_dirs:
1155     if os.path.isdir(dir_name):
1156       try:
1157         f_names = utils.ListVisibleFiles(dir_name)
1158       except EnvironmentError, err:
1159         logging.exception("Can't list the OS directory %s", dir_name)
1160         break
1161       for name in f_names:
1162         try:
1163           os_inst = OSFromDisk(name, base_dir=dir_name)
1164           result.append(os_inst)
1165         except errors.InvalidOS, err:
1166           result.append(objects.OS.FromInvalidOS(err))
1167
1168   return result
1169
1170
1171 def OSFromDisk(name, base_dir=None):
1172   """Create an OS instance from disk.
1173
1174   This function will return an OS instance if the given name is a
1175   valid OS name. Otherwise, it will raise an appropriate
1176   `errors.InvalidOS` exception, detailing why this is not a valid
1177   OS.
1178
1179   @type base_dir: string
1180   @keyword base_dir: Base directory containing OS installations.
1181                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1182
1183   """
1184
1185   if base_dir is None:
1186     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1187     if os_dir is None:
1188       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1189   else:
1190     os_dir = os.path.sep.join([base_dir, name])
1191
1192   api_versions = _OSOndiskVersion(name, os_dir)
1193
1194   if constants.OS_API_VERSION not in api_versions:
1195     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1196                            " (found %s want %s)"
1197                            % (api_versions, constants.OS_API_VERSION))
1198
1199   # OS Scripts dictionary, we will populate it with the actual script names
1200   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1201
1202   for script in os_scripts:
1203     os_scripts[script] = os.path.sep.join([os_dir, script])
1204
1205     try:
1206       st = os.stat(os_scripts[script])
1207     except EnvironmentError, err:
1208       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1209                              (script, _ErrnoOrStr(err)))
1210
1211     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1212       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1213                              script)
1214
1215     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1216       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1217                              script)
1218
1219
1220   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1221                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1222                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1223                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1224                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1225                     api_versions=api_versions)
1226
1227 def OSEnvironment(instance, debug=0):
1228   """Calculate the environment for an os script.
1229
1230   @type instance: instance object
1231   @param instance: target instance for the os script run
1232   @type debug: integer
1233   @param debug: debug level (0 or 1, for os api 10)
1234   @rtype: dict
1235   @return: dict of environment variables
1236
1237   """
1238   result = {}
1239   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1240   result['INSTANCE_NAME'] = instance.name
1241   result['HYPERVISOR'] = instance.hypervisor
1242   result['DISK_COUNT'] = '%d' % len(instance.disks)
1243   result['NIC_COUNT'] = '%d' % len(instance.nics)
1244   result['DEBUG_LEVEL'] = '%d' % debug
1245   for idx, disk in enumerate(instance.disks):
1246     real_disk = _RecursiveFindBD(disk)
1247     if real_disk is None:
1248       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1249                                     str(disk))
1250     real_disk.Open()
1251     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1252     # FIXME: When disks will have read-only mode, populate this
1253     result['DISK_%d_ACCESS' % idx] = 'W'
1254     if constants.HV_DISK_TYPE in instance.hvparams:
1255       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1256         instance.hvparams[constants.HV_DISK_TYPE]
1257     if disk.dev_type in constants.LDS_BLOCK:
1258       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1259     elif disk.dev_type == constants.LD_FILE:
1260       result['DISK_%d_BACKEND_TYPE' % idx] = \
1261         'file:%s' % disk.physical_id[0]
1262   for idx, nic in enumerate(instance.nics):
1263     result['NIC_%d_MAC' % idx] = nic.mac
1264     if nic.ip:
1265       result['NIC_%d_IP' % idx] = nic.ip
1266     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1267     if constants.HV_NIC_TYPE in instance.hvparams:
1268       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1269         instance.hvparams[constants.HV_NIC_TYPE]
1270
1271   return result
1272
1273 def GrowBlockDevice(disk, amount):
1274   """Grow a stack of block devices.
1275
1276   This function is called recursively, with the childrens being the
1277   first one resize.
1278
1279   Args:
1280     disk: the disk to be grown
1281
1282   Returns: a tuple of (status, result), with:
1283     status: the result (true/false) of the operation
1284     result: the error message if the operation failed, otherwise not used
1285
1286   """
1287   r_dev = _RecursiveFindBD(disk)
1288   if r_dev is None:
1289     return False, "Cannot find block device %s" % (disk,)
1290
1291   try:
1292     r_dev.Grow(amount)
1293   except errors.BlockDeviceError, err:
1294     return False, str(err)
1295
1296   return True, None
1297
1298
1299 def SnapshotBlockDevice(disk):
1300   """Create a snapshot copy of a block device.
1301
1302   This function is called recursively, and the snapshot is actually created
1303   just for the leaf lvm backend device.
1304
1305   @type disk: L{objects.Disk}
1306   @param disk: the disk to be snapshotted
1307   @rtype: string
1308   @return: snapshot disk path
1309
1310   """
1311   if disk.children:
1312     if len(disk.children) == 1:
1313       # only one child, let's recurse on it
1314       return SnapshotBlockDevice(disk.children[0])
1315     else:
1316       # more than one child, choose one that matches
1317       for child in disk.children:
1318         if child.size == disk.size:
1319           # return implies breaking the loop
1320           return SnapshotBlockDevice(child)
1321   elif disk.dev_type == constants.LD_LV:
1322     r_dev = _RecursiveFindBD(disk)
1323     if r_dev is not None:
1324       # let's stay on the safe side and ask for the full size, for now
1325       return r_dev.Snapshot(disk.size)
1326     else:
1327       return None
1328   else:
1329     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1330                                  " '%s' of type '%s'" %
1331                                  (disk.unique_id, disk.dev_type))
1332
1333
1334 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1335   """Export a block device snapshot to a remote node.
1336
1337   Args:
1338     disk: the snapshot block device
1339     dest_node: the node to send the image to
1340     instance: instance being exported
1341
1342   Returns:
1343     True if successful, False otherwise.
1344
1345   """
1346   # TODO(ultrotter): Import/Export still to be converted to OS API 10
1347   logging.error("Import/Export still to be converted to OS API 10")
1348   return False
1349
1350   inst_os = OSFromDisk(instance.os)
1351   export_script = inst_os.export_script
1352
1353   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1354                                      instance.name, int(time.time()))
1355   if not os.path.exists(constants.LOG_OS_DIR):
1356     os.mkdir(constants.LOG_OS_DIR, 0750)
1357
1358   real_os_dev = _RecursiveFindBD(disk)
1359   if real_os_dev is None:
1360     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1361                                   str(disk))
1362   real_os_dev.Open()
1363
1364   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1365   destfile = disk.physical_id[1]
1366
1367   # the target command is built out of three individual commands,
1368   # which are joined by pipes; we check each individual command for
1369   # valid parameters
1370
1371   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1372                                export_script, instance.name,
1373                                real_os_dev.dev_path, logfile)
1374
1375   comprcmd = "gzip"
1376
1377   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1378                                 destdir, destdir, destfile)
1379   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1380                                                    constants.GANETI_RUNAS,
1381                                                    destcmd)
1382
1383   # all commands have been checked, so we're safe to combine them
1384   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1385
1386   result = utils.RunCmd(command)
1387
1388   if result.failed:
1389     logging.error("os snapshot export command '%s' returned error: %s"
1390                   " output: %s", command, result.fail_reason, result.output)
1391     return False
1392
1393   return True
1394
1395
1396 def FinalizeExport(instance, snap_disks):
1397   """Write out the export configuration information.
1398
1399   Args:
1400     instance: instance configuration
1401     snap_disks: snapshot block devices
1402
1403   Returns:
1404     False in case of error, True otherwise.
1405
1406   """
1407   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1408   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1409
1410   config = objects.SerializableConfigParser()
1411
1412   config.add_section(constants.INISECT_EXP)
1413   config.set(constants.INISECT_EXP, 'version', '0')
1414   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1415   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1416   config.set(constants.INISECT_EXP, 'os', instance.os)
1417   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1418
1419   config.add_section(constants.INISECT_INS)
1420   config.set(constants.INISECT_INS, 'name', instance.name)
1421   config.set(constants.INISECT_INS, 'memory', '%d' %
1422              instance.beparams[constants.BE_MEMORY])
1423   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1424              instance.beparams[constants.BE_VCPUS])
1425   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1426
1427   nic_count = 0
1428   for nic_count, nic in enumerate(instance.nics):
1429     config.set(constants.INISECT_INS, 'nic%d_mac' %
1430                nic_count, '%s' % nic.mac)
1431     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1432     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1433                '%s' % nic.bridge)
1434   # TODO: redundant: on load can read nics until it doesn't exist
1435   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1436
1437   disk_count = 0
1438   for disk_count, disk in enumerate(snap_disks):
1439     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1440                ('%s' % disk.iv_name))
1441     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1442                ('%s' % disk.physical_id[1]))
1443     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1444                ('%d' % disk.size))
1445   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1446
1447   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1448   cfo = open(cff, 'w')
1449   try:
1450     config.write(cfo)
1451   finally:
1452     cfo.close()
1453
1454   shutil.rmtree(finaldestdir, True)
1455   shutil.move(destdir, finaldestdir)
1456
1457   return True
1458
1459
1460 def ExportInfo(dest):
1461   """Get export configuration information.
1462
1463   Args:
1464     dest: directory containing the export
1465
1466   Returns:
1467     A serializable config file containing the export info.
1468
1469   """
1470   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1471
1472   config = objects.SerializableConfigParser()
1473   config.read(cff)
1474
1475   if (not config.has_section(constants.INISECT_EXP) or
1476       not config.has_section(constants.INISECT_INS)):
1477     return None
1478
1479   return config
1480
1481
1482 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1483                          cluster_name):
1484   """Import an os image into an instance.
1485
1486   Args:
1487     instance: the instance object
1488     os_disk: the instance-visible name of the os device
1489     swap_disk: the instance-visible name of the swap device
1490     src_node: node holding the source image
1491     src_image: path to the source image on src_node
1492
1493   Returns:
1494     False in case of error, True otherwise.
1495
1496   """
1497   # TODO(ultrotter): Import/Export still to be converted to OS API 10
1498   logging.error("Import/Export still to be converted to OS API 10")
1499   return False
1500
1501   inst_os = OSFromDisk(instance.os)
1502   import_script = inst_os.import_script
1503
1504   os_device = instance.FindDisk(os_disk)
1505   if os_device is None:
1506     logging.error("Can't find this device-visible name '%s'", os_disk)
1507     return False
1508
1509   swap_device = instance.FindDisk(swap_disk)
1510   if swap_device is None:
1511     logging.error("Can't find this device-visible name '%s'", swap_disk)
1512     return False
1513
1514   real_os_dev = _RecursiveFindBD(os_device)
1515   if real_os_dev is None:
1516     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1517                                   str(os_device))
1518   real_os_dev.Open()
1519
1520   real_swap_dev = _RecursiveFindBD(swap_device)
1521   if real_swap_dev is None:
1522     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1523                                   str(swap_device))
1524   real_swap_dev.Open()
1525
1526   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1527                                         instance.name, int(time.time()))
1528   if not os.path.exists(constants.LOG_OS_DIR):
1529     os.mkdir(constants.LOG_OS_DIR, 0750)
1530
1531   destcmd = utils.BuildShellCmd('cat %s', src_image)
1532   remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1533                                                    constants.GANETI_RUNAS,
1534                                                    destcmd)
1535
1536   comprcmd = "gunzip"
1537   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1538                                inst_os.path, import_script, instance.name,
1539                                real_os_dev.dev_path, real_swap_dev.dev_path,
1540                                logfile)
1541
1542   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1543   env = {'HYPERVISOR': instance.hypervisor}
1544
1545   result = utils.RunCmd(command, env=env)
1546
1547   if result.failed:
1548     logging.error("os import command '%s' returned error: %s"
1549                   " output: %s", command, result.fail_reason, result.output)
1550     return False
1551
1552   return True
1553
1554
1555 def ListExports():
1556   """Return a list of exports currently available on this machine.
1557
1558   """
1559   if os.path.isdir(constants.EXPORT_DIR):
1560     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1561   else:
1562     return []
1563
1564
1565 def RemoveExport(export):
1566   """Remove an existing export from the node.
1567
1568   Args:
1569     export: the name of the export to remove
1570
1571   Returns:
1572     False in case of error, True otherwise.
1573
1574   """
1575   target = os.path.join(constants.EXPORT_DIR, export)
1576
1577   shutil.rmtree(target)
1578   # TODO: catch some of the relevant exceptions and provide a pretty
1579   # error message if rmtree fails.
1580
1581   return True
1582
1583
1584 def RenameBlockDevices(devlist):
1585   """Rename a list of block devices.
1586
1587   The devlist argument is a list of tuples (disk, new_logical,
1588   new_physical). The return value will be a combined boolean result
1589   (True only if all renames succeeded).
1590
1591   """
1592   result = True
1593   for disk, unique_id in devlist:
1594     dev = _RecursiveFindBD(disk)
1595     if dev is None:
1596       result = False
1597       continue
1598     try:
1599       old_rpath = dev.dev_path
1600       dev.Rename(unique_id)
1601       new_rpath = dev.dev_path
1602       if old_rpath != new_rpath:
1603         DevCacheManager.RemoveCache(old_rpath)
1604         # FIXME: we should add the new cache information here, like:
1605         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1606         # but we don't have the owner here - maybe parse from existing
1607         # cache? for now, we only lose lvm data when we rename, which
1608         # is less critical than DRBD or MD
1609     except errors.BlockDeviceError, err:
1610       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1611       result = False
1612   return result
1613
1614
1615 def _TransformFileStorageDir(file_storage_dir):
1616   """Checks whether given file_storage_dir is valid.
1617
1618   Checks wheter the given file_storage_dir is within the cluster-wide
1619   default file_storage_dir stored in SimpleStore. Only paths under that
1620   directory are allowed.
1621
1622   Args:
1623     file_storage_dir: string with path
1624
1625   Returns:
1626     normalized file_storage_dir (string) if valid, None otherwise
1627
1628   """
1629   cfg = _GetConfig()
1630   file_storage_dir = os.path.normpath(file_storage_dir)
1631   base_file_storage_dir = cfg.GetFileStorageDir()
1632   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1633       base_file_storage_dir):
1634     logging.error("file storage directory '%s' is not under base file"
1635                   " storage directory '%s'",
1636                   file_storage_dir, base_file_storage_dir)
1637     return None
1638   return file_storage_dir
1639
1640
1641 def CreateFileStorageDir(file_storage_dir):
1642   """Create file storage directory.
1643
1644   Args:
1645     file_storage_dir: string containing the path
1646
1647   Returns:
1648     tuple with first element a boolean indicating wheter dir
1649     creation was successful or not
1650
1651   """
1652   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1653   result = True,
1654   if not file_storage_dir:
1655     result = False,
1656   else:
1657     if os.path.exists(file_storage_dir):
1658       if not os.path.isdir(file_storage_dir):
1659         logging.error("'%s' is not a directory", file_storage_dir)
1660         result = False,
1661     else:
1662       try:
1663         os.makedirs(file_storage_dir, 0750)
1664       except OSError, err:
1665         logging.error("Cannot create file storage directory '%s': %s",
1666                       file_storage_dir, err)
1667         result = False,
1668   return result
1669
1670
1671 def RemoveFileStorageDir(file_storage_dir):
1672   """Remove file storage directory.
1673
1674   Remove it only if it's empty. If not log an error and return.
1675
1676   Args:
1677     file_storage_dir: string containing the path
1678
1679   Returns:
1680     tuple with first element a boolean indicating wheter dir
1681     removal was successful or not
1682
1683   """
1684   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1685   result = True,
1686   if not file_storage_dir:
1687     result = False,
1688   else:
1689     if os.path.exists(file_storage_dir):
1690       if not os.path.isdir(file_storage_dir):
1691         logging.error("'%s' is not a directory", file_storage_dir)
1692         result = False,
1693       # deletes dir only if empty, otherwise we want to return False
1694       try:
1695         os.rmdir(file_storage_dir)
1696       except OSError, err:
1697         logging.exception("Cannot remove file storage directory '%s'",
1698                           file_storage_dir)
1699         result = False,
1700   return result
1701
1702
1703 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1704   """Rename the file storage directory.
1705
1706   Args:
1707     old_file_storage_dir: string containing the old path
1708     new_file_storage_dir: string containing the new path
1709
1710   Returns:
1711     tuple with first element a boolean indicating wheter dir
1712     rename was successful or not
1713
1714   """
1715   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1716   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1717   result = True,
1718   if not old_file_storage_dir or not new_file_storage_dir:
1719     result = False,
1720   else:
1721     if not os.path.exists(new_file_storage_dir):
1722       if os.path.isdir(old_file_storage_dir):
1723         try:
1724           os.rename(old_file_storage_dir, new_file_storage_dir)
1725         except OSError, err:
1726           logging.exception("Cannot rename '%s' to '%s'",
1727                             old_file_storage_dir, new_file_storage_dir)
1728           result =  False,
1729       else:
1730         logging.error("'%s' is not a directory", old_file_storage_dir)
1731         result = False,
1732     else:
1733       if os.path.exists(old_file_storage_dir):
1734         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1735                       old_file_storage_dir, new_file_storage_dir)
1736         result = False,
1737   return result
1738
1739
1740 def _IsJobQueueFile(file_name):
1741   """Checks whether the given filename is in the queue directory.
1742
1743   """
1744   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1745   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1746
1747   if not result:
1748     logging.error("'%s' is not a file in the queue directory",
1749                   file_name)
1750
1751   return result
1752
1753
1754 def JobQueueUpdate(file_name, content):
1755   """Updates a file in the queue directory.
1756
1757   """
1758   if not _IsJobQueueFile(file_name):
1759     return False
1760
1761   # Write and replace the file atomically
1762   utils.WriteFile(file_name, data=content)
1763
1764   return True
1765
1766
1767 def JobQueueRename(old, new):
1768   """Renames a job queue file.
1769
1770   """
1771   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1772     return False
1773
1774   os.rename(old, new)
1775
1776   return True
1777
1778
1779 def JobQueueSetDrainFlag(drain_flag):
1780   """Set the drain flag for the queue.
1781
1782   This will set or unset the queue drain flag.
1783
1784   @type drain_flag: bool
1785   @param drain_flag: if True, will set the drain flag, otherwise reset it.
1786
1787   """
1788   if drain_flag:
1789     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1790   else:
1791     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1792
1793   return True
1794
1795
1796 def CloseBlockDevices(disks):
1797   """Closes the given block devices.
1798
1799   This means they will be switched to secondary mode (in case of DRBD).
1800
1801   """
1802   bdevs = []
1803   for cf in disks:
1804     rd = _RecursiveFindBD(cf)
1805     if rd is None:
1806       return (False, "Can't find device %s" % cf)
1807     bdevs.append(rd)
1808
1809   msg = []
1810   for rd in bdevs:
1811     try:
1812       rd.Close()
1813     except errors.BlockDeviceError, err:
1814       msg.append(str(err))
1815   if msg:
1816     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1817   else:
1818     return (True, "All devices secondary")
1819
1820
1821 def ValidateHVParams(hvname, hvparams):
1822   """Validates the given hypervisor parameters.
1823
1824   @type hvname: string
1825   @param hvname: the hypervisor name
1826   @type hvparams: dict
1827   @param hvparams: the hypervisor parameters to be validated
1828   @rtype: tuple (bool, str)
1829   @return: tuple of (success, message)
1830
1831   """
1832   try:
1833     hv_type = hypervisor.GetHypervisor(hvname)
1834     hv_type.ValidateParameters(hvparams)
1835     return (True, "Validation passed")
1836   except errors.HypervisorError, err:
1837     return (False, str(err))
1838
1839
1840 class HooksRunner(object):
1841   """Hook runner.
1842
1843   This class is instantiated on the node side (ganeti-noded) and not on
1844   the master side.
1845
1846   """
1847   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1848
1849   def __init__(self, hooks_base_dir=None):
1850     """Constructor for hooks runner.
1851
1852     Args:
1853       - hooks_base_dir: if not None, this overrides the
1854         constants.HOOKS_BASE_DIR (useful for unittests)
1855
1856     """
1857     if hooks_base_dir is None:
1858       hooks_base_dir = constants.HOOKS_BASE_DIR
1859     self._BASE_DIR = hooks_base_dir
1860
1861   @staticmethod
1862   def ExecHook(script, env):
1863     """Exec one hook script.
1864
1865     Args:
1866      - script: the full path to the script
1867      - env: the environment with which to exec the script
1868
1869     """
1870     # exec the process using subprocess and log the output
1871     fdstdin = None
1872     try:
1873       fdstdin = open("/dev/null", "r")
1874       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1875                                stderr=subprocess.STDOUT, close_fds=True,
1876                                shell=False, cwd="/", env=env)
1877       output = ""
1878       try:
1879         output = child.stdout.read(4096)
1880         child.stdout.close()
1881       except EnvironmentError, err:
1882         output += "Hook script error: %s" % str(err)
1883
1884       while True:
1885         try:
1886           result = child.wait()
1887           break
1888         except EnvironmentError, err:
1889           if err.errno == errno.EINTR:
1890             continue
1891           raise
1892     finally:
1893       # try not to leak fds
1894       for fd in (fdstdin, ):
1895         if fd is not None:
1896           try:
1897             fd.close()
1898           except EnvironmentError, err:
1899             # just log the error
1900             #logging.exception("Error while closing fd %s", fd)
1901             pass
1902
1903     return result == 0, output
1904
1905   def RunHooks(self, hpath, phase, env):
1906     """Run the scripts in the hooks directory.
1907
1908     This method will not be usually overriden by child opcodes.
1909
1910     """
1911     if phase == constants.HOOKS_PHASE_PRE:
1912       suffix = "pre"
1913     elif phase == constants.HOOKS_PHASE_POST:
1914       suffix = "post"
1915     else:
1916       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1917     rr = []
1918
1919     subdir = "%s-%s.d" % (hpath, suffix)
1920     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1921     try:
1922       dir_contents = utils.ListVisibleFiles(dir_name)
1923     except OSError, err:
1924       # must log
1925       return rr
1926
1927     # we use the standard python sort order,
1928     # so 00name is the recommended naming scheme
1929     dir_contents.sort()
1930     for relname in dir_contents:
1931       fname = os.path.join(dir_name, relname)
1932       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1933           self.RE_MASK.match(relname) is not None):
1934         rrval = constants.HKR_SKIP
1935         output = ""
1936       else:
1937         result, output = self.ExecHook(fname, env)
1938         if not result:
1939           rrval = constants.HKR_FAIL
1940         else:
1941           rrval = constants.HKR_SUCCESS
1942       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1943
1944     return rr
1945
1946
1947 class IAllocatorRunner(object):
1948   """IAllocator runner.
1949
1950   This class is instantiated on the node side (ganeti-noded) and not on
1951   the master side.
1952
1953   """
1954   def Run(self, name, idata):
1955     """Run an iallocator script.
1956
1957     Return value: tuple of:
1958        - run status (one of the IARUN_ constants)
1959        - stdout
1960        - stderr
1961        - fail reason (as from utils.RunResult)
1962
1963     """
1964     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1965                                   os.path.isfile)
1966     if alloc_script is None:
1967       return (constants.IARUN_NOTFOUND, None, None, None)
1968
1969     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1970     try:
1971       os.write(fd, idata)
1972       os.close(fd)
1973       result = utils.RunCmd([alloc_script, fin_name])
1974       if result.failed:
1975         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1976                 result.fail_reason)
1977     finally:
1978       os.unlink(fin_name)
1979
1980     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1981
1982
1983 class DevCacheManager(object):
1984   """Simple class for managing a cache of block device information.
1985
1986   """
1987   _DEV_PREFIX = "/dev/"
1988   _ROOT_DIR = constants.BDEV_CACHE_DIR
1989
1990   @classmethod
1991   def _ConvertPath(cls, dev_path):
1992     """Converts a /dev/name path to the cache file name.
1993
1994     This replaces slashes with underscores and strips the /dev
1995     prefix. It then returns the full path to the cache file
1996
1997     """
1998     if dev_path.startswith(cls._DEV_PREFIX):
1999       dev_path = dev_path[len(cls._DEV_PREFIX):]
2000     dev_path = dev_path.replace("/", "_")
2001     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2002     return fpath
2003
2004   @classmethod
2005   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2006     """Updates the cache information for a given device.
2007
2008     """
2009     if dev_path is None:
2010       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2011       return
2012     fpath = cls._ConvertPath(dev_path)
2013     if on_primary:
2014       state = "primary"
2015     else:
2016       state = "secondary"
2017     if iv_name is None:
2018       iv_name = "not_visible"
2019     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2020     try:
2021       utils.WriteFile(fpath, data=fdata)
2022     except EnvironmentError, err:
2023       logging.exception("Can't update bdev cache for %s", dev_path)
2024
2025   @classmethod
2026   def RemoveCache(cls, dev_path):
2027     """Remove data for a dev_path.
2028
2029     """
2030     if dev_path is None:
2031       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2032       return
2033     fpath = cls._ConvertPath(dev_path)
2034     try:
2035       utils.RemoveFile(fpath)
2036     except EnvironmentError, err:
2037       logging.exception("Can't update bdev cache for %s", dev_path)