Remove two fixed FIXME and convert one to TODO
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       raise
579
580   return results
581
582
583 def GetInstanceInfo(instance, hname):
584   """Gives back the informations about an instance as a dictionary.
585
586   @type instance: string
587   @param instance: the instance name
588   @type hname: string
589   @param hname: the hypervisor type of the instance
590
591   @rtype: dict
592   @return: dictionary with the following keys:
593       - memory: memory size of instance (int)
594       - state: xen state of instance (string)
595       - time: cpu time of instance (float)
596
597   """
598   output = {}
599
600   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
601   if iinfo is not None:
602     output['memory'] = iinfo[2]
603     output['state'] = iinfo[4]
604     output['time'] = iinfo[5]
605
606   return output
607
608
609 def GetInstanceMigratable(instance):
610   """Gives whether an instance can be migrated.
611
612   @type instance: L{objects.Instance}
613   @param instance: object representing the instance to be checked.
614
615   @rtype: tuple
616   @return: tuple of (result, description) where:
617       - result: whether the instance can be migrated or not
618       - description: a description of the issue, if relevant
619
620   """
621   hyper = hypervisor.GetHypervisor(instance.hypervisor)
622   if instance.name not in hyper.ListInstances():
623     return (False, 'not running')
624
625   for idx in range(len(instance.disks)):
626     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
627     if not os.path.islink(link_name):
628       return (False, 'not restarted since ganeti 1.2.5')
629
630   return (True, '')
631
632
633 def GetAllInstancesInfo(hypervisor_list):
634   """Gather data about all instances.
635
636   This is the equivalent of L{GetInstanceInfo}, except that it
637   computes data for all instances at once, thus being faster if one
638   needs data about more than one instance.
639
640   @type hypervisor_list: list
641   @param hypervisor_list: list of hypervisors to query for instance data
642
643   @rtype: dict
644   @return: dictionary of instance: data, with data having the following keys:
645       - memory: memory size of instance (int)
646       - state: xen state of instance (string)
647       - time: cpu time of instance (float)
648       - vcpus: the number of vcpus
649
650   """
651   output = {}
652
653   for hname in hypervisor_list:
654     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
655     if iinfo:
656       for name, inst_id, memory, vcpus, state, times in iinfo:
657         value = {
658           'memory': memory,
659           'vcpus': vcpus,
660           'state': state,
661           'time': times,
662           }
663         if name in output and output[name] != value:
664           raise errors.HypervisorError("Instance %s running duplicate"
665                                        " with different parameters" % name)
666         output[name] = value
667
668   return output
669
670
671 def InstanceOsAdd(instance):
672   """Add an OS to an instance.
673
674   @type instance: L{objects.Instance}
675   @param instance: Instance whose OS is to be installed
676   @rtype: boolean
677   @return: the success of the operation
678
679   """
680   try:
681     inst_os = OSFromDisk(instance.os)
682   except errors.InvalidOS, err:
683     os_name, os_dir, os_err = err.args
684     if os_dir is None:
685       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
686     else:
687       return (False, "Error parsing OS '%s' in directory %s: %s" %
688               (os_name, os_dir, os_err))
689
690   create_env = OSEnvironment(instance)
691
692   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
693                                      instance.name, int(time.time()))
694
695   result = utils.RunCmd([inst_os.create_script], env=create_env,
696                         cwd=inst_os.path, output=logfile,)
697   if result.failed:
698     logging.error("os create command '%s' returned error: %s, logfile: %s,"
699                   " output: %s", result.cmd, result.fail_reason, logfile,
700                   result.output)
701     lines = [utils.SafeEncode(val)
702              for val in utils.TailFile(logfile, lines=20)]
703     return (False, "OS create script failed (%s), last lines in the"
704             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
705
706   return (True, "Successfully installed")
707
708
709 def RunRenameInstance(instance, old_name):
710   """Run the OS rename script for an instance.
711
712   @type instance: L{objects.Instance}
713   @param instance: Instance whose OS is to be installed
714   @type old_name: string
715   @param old_name: previous instance name
716   @rtype: boolean
717   @return: the success of the operation
718
719   """
720   inst_os = OSFromDisk(instance.os)
721
722   rename_env = OSEnvironment(instance)
723   rename_env['OLD_INSTANCE_NAME'] = old_name
724
725   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
726                                            old_name,
727                                            instance.name, int(time.time()))
728
729   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
730                         cwd=inst_os.path, output=logfile)
731
732   if result.failed:
733     logging.error("os create command '%s' returned error: %s output: %s",
734                   result.cmd, result.fail_reason, result.output)
735     lines = [utils.SafeEncode(val)
736              for val in utils.TailFile(logfile, lines=20)]
737     return (False, "OS rename script failed (%s), last lines in the"
738             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
739
740   return (True, "Rename successful")
741
742
743 def _GetVGInfo(vg_name):
744   """Get informations about the volume group.
745
746   @type vg_name: str
747   @param vg_name: the volume group which we query
748   @rtype: dict
749   @return:
750     A dictionary with the following keys:
751       - C{vg_size} is the total size of the volume group in MiB
752       - C{vg_free} is the free size of the volume group in MiB
753       - C{pv_count} are the number of physical disks in that VG
754
755     If an error occurs during gathering of data, we return the same dict
756     with keys all set to None.
757
758   """
759   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
760
761   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
762                          "--nosuffix", "--units=m", "--separator=:", vg_name])
763
764   if retval.failed:
765     logging.error("volume group %s not present", vg_name)
766     return retdic
767   valarr = retval.stdout.strip().rstrip(':').split(':')
768   if len(valarr) == 3:
769     try:
770       retdic = {
771         "vg_size": int(round(float(valarr[0]), 0)),
772         "vg_free": int(round(float(valarr[1]), 0)),
773         "pv_count": int(valarr[2]),
774         }
775     except ValueError, err:
776       logging.exception("Fail to parse vgs output")
777   else:
778     logging.error("vgs output has the wrong number of fields (expected"
779                   " three): %s", str(valarr))
780   return retdic
781
782
783 def _GetBlockDevSymlinkPath(instance_name, idx):
784   return os.path.join(constants.DISK_LINKS_DIR,
785                       "%s:%d" % (instance_name, idx))
786
787
788 def _SymlinkBlockDev(instance_name, device_path, idx):
789   """Set up symlinks to a instance's block device.
790
791   This is an auxiliary function run when an instance is start (on the primary
792   node) or when an instance is migrated (on the target node).
793
794
795   @param instance_name: the name of the target instance
796   @param device_path: path of the physical block device, on the node
797   @param idx: the disk index
798   @return: absolute path to the disk's symlink
799
800   """
801   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
802   try:
803     os.symlink(device_path, link_name)
804   except OSError, err:
805     if err.errno == errno.EEXIST:
806       if (not os.path.islink(link_name) or
807           os.readlink(link_name) != device_path):
808         os.remove(link_name)
809         os.symlink(device_path, link_name)
810     else:
811       raise
812
813   return link_name
814
815
816 def _RemoveBlockDevLinks(instance_name, disks):
817   """Remove the block device symlinks belonging to the given instance.
818
819   """
820   for idx, disk in enumerate(disks):
821     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
822     if os.path.islink(link_name):
823       try:
824         os.remove(link_name)
825       except OSError:
826         logging.exception("Can't remove symlink '%s'", link_name)
827
828
829 def _GatherAndLinkBlockDevs(instance):
830   """Set up an instance's block device(s).
831
832   This is run on the primary node at instance startup. The block
833   devices must be already assembled.
834
835   @type instance: L{objects.Instance}
836   @param instance: the instance whose disks we shoul assemble
837   @rtype: list
838   @return: list of (disk_object, device_path)
839
840   """
841   block_devices = []
842   for idx, disk in enumerate(instance.disks):
843     device = _RecursiveFindBD(disk)
844     if device is None:
845       raise errors.BlockDeviceError("Block device '%s' is not set up." %
846                                     str(disk))
847     device.Open()
848     try:
849       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
850     except OSError, e:
851       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
852                                     e.strerror)
853
854     block_devices.append((disk, link_name))
855
856   return block_devices
857
858
859 def StartInstance(instance, extra_args):
860   """Start an instance.
861
862   @type instance: L{objects.Instance}
863   @param instance: the instance object
864   @rtype: boolean
865   @return: whether the startup was successful or not
866
867   """
868   running_instances = GetInstanceList([instance.hypervisor])
869
870   if instance.name in running_instances:
871     return (True, "Already running")
872
873   try:
874     block_devices = _GatherAndLinkBlockDevs(instance)
875     hyper = hypervisor.GetHypervisor(instance.hypervisor)
876     hyper.StartInstance(instance, block_devices, extra_args)
877   except errors.BlockDeviceError, err:
878     logging.exception("Failed to start instance")
879     return (False, "Block device error: %s" % str(err))
880   except errors.HypervisorError, err:
881     logging.exception("Failed to start instance")
882     _RemoveBlockDevLinks(instance.name, instance.disks)
883     return (False, "Hypervisor error: %s" % str(err))
884
885   return (True, "Instance started successfully")
886
887
888 def InstanceShutdown(instance):
889   """Shut an instance down.
890
891   @note: this functions uses polling with a hardcoded timeout.
892
893   @type instance: L{objects.Instance}
894   @param instance: the instance object
895   @rtype: boolean
896   @return: whether the startup was successful or not
897
898   """
899   hv_name = instance.hypervisor
900   running_instances = GetInstanceList([hv_name])
901
902   if instance.name not in running_instances:
903     return (True, "Instance already stopped")
904
905   hyper = hypervisor.GetHypervisor(hv_name)
906   try:
907     hyper.StopInstance(instance)
908   except errors.HypervisorError, err:
909     msg = "Failed to stop instance %s: %s" % (instance.name, err)
910     logging.error(msg)
911     return (False, msg)
912
913   # test every 10secs for 2min
914
915   time.sleep(1)
916   for dummy in range(11):
917     if instance.name not in GetInstanceList([hv_name]):
918       break
919     time.sleep(10)
920   else:
921     # the shutdown did not succeed
922     logging.error("Shutdown of '%s' unsuccessful, using destroy",
923                   instance.name)
924
925     try:
926       hyper.StopInstance(instance, force=True)
927     except errors.HypervisorError, err:
928       msg = "Failed to force stop instance %s: %s" % (instance.name, err)
929       logging.error(msg)
930       return (False, msg)
931
932     time.sleep(1)
933     if instance.name in GetInstanceList([hv_name]):
934       msg = ("Could not shutdown instance %s even by destroy" %
935              instance.name)
936       logging.error(msg)
937       return (False, msg)
938
939   _RemoveBlockDevLinks(instance.name, instance.disks)
940
941   return (True, "Instance has been shutdown successfully")
942
943
944 def InstanceReboot(instance, reboot_type, extra_args):
945   """Reboot an instance.
946
947   @type instance: L{objects.Instance}
948   @param instance: the instance object to reboot
949   @type reboot_type: str
950   @param reboot_type: the type of reboot, one the following
951     constants:
952       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
953         instance OS, do not recreate the VM
954       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
955         restart the VM (at the hypervisor level)
956       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
957         is not accepted here, since that mode is handled
958         differently
959   @rtype: boolean
960   @return: the success of the operation
961
962   """
963   running_instances = GetInstanceList([instance.hypervisor])
964
965   if instance.name not in running_instances:
966     msg = "Cannot reboot instance %s that is not running" % instance.name
967     logging.error(msg)
968     return (False, msg)
969
970   hyper = hypervisor.GetHypervisor(instance.hypervisor)
971   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
972     try:
973       hyper.RebootInstance(instance)
974     except errors.HypervisorError, err:
975       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
976       logging.error(msg)
977       return (False, msg)
978   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
979     try:
980       InstanceShutdown(instance)
981       StartInstance(instance, extra_args)
982     except errors.HypervisorError, err:
983       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
984       logging.error(msg)
985       return (False, msg)
986   else:
987     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
988
989   return (True, "Reboot successful")
990
991
992 def MigrationInfo(instance):
993   """Gather information about an instance to be migrated.
994
995   @type instance: L{objects.Instance}
996   @param instance: the instance definition
997
998   """
999   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1000   try:
1001     info = hyper.MigrationInfo(instance)
1002   except errors.HypervisorError, err:
1003     msg = "Failed to fetch migration information"
1004     logging.exception(msg)
1005     return (False, '%s: %s' % (msg, err))
1006   return (True, info)
1007
1008
1009 def AcceptInstance(instance, info, target):
1010   """Prepare the node to accept an instance.
1011
1012   @type instance: L{objects.Instance}
1013   @param instance: the instance definition
1014   @type info: string/data (opaque)
1015   @param info: migration information, from the source node
1016   @type target: string
1017   @param target: target host (usually ip), on this node
1018
1019   """
1020   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1021   try:
1022     hyper.AcceptInstance(instance, info, target)
1023   except errors.HypervisorError, err:
1024     msg = "Failed to accept instance"
1025     logging.exception(msg)
1026     return (False, '%s: %s' % (msg, err))
1027   return (True, "Accept successfull")
1028
1029
1030 def FinalizeMigration(instance, info, success):
1031   """Finalize any preparation to accept an instance.
1032
1033   @type instance: L{objects.Instance}
1034   @param instance: the instance definition
1035   @type info: string/data (opaque)
1036   @param info: migration information, from the source node
1037   @type success: boolean
1038   @param success: whether the migration was a success or a failure
1039
1040   """
1041   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1042   try:
1043     hyper.FinalizeMigration(instance, info, success)
1044   except errors.HypervisorError, err:
1045     msg = "Failed to finalize migration"
1046     logging.exception(msg)
1047     return (False, '%s: %s' % (msg, err))
1048   return (True, "Migration Finalized")
1049
1050
1051 def MigrateInstance(instance, target, live):
1052   """Migrates an instance to another node.
1053
1054   @type instance: L{objects.Instance}
1055   @param instance: the instance definition
1056   @type target: string
1057   @param target: the target node name
1058   @type live: boolean
1059   @param live: whether the migration should be done live or not (the
1060       interpretation of this parameter is left to the hypervisor)
1061   @rtype: tuple
1062   @return: a tuple of (success, msg) where:
1063       - succes is a boolean denoting the success/failure of the operation
1064       - msg is a string with details in case of failure
1065
1066   """
1067   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1068
1069   try:
1070     hyper.MigrateInstance(instance.name, target, live)
1071   except errors.HypervisorError, err:
1072     msg = "Failed to migrate instance"
1073     logging.exception(msg)
1074     return (False, "%s: %s" % (msg, err))
1075   return (True, "Migration successfull")
1076
1077
1078 def BlockdevCreate(disk, size, owner, on_primary, info):
1079   """Creates a block device for an instance.
1080
1081   @type disk: L{objects.Disk}
1082   @param disk: the object describing the disk we should create
1083   @type size: int
1084   @param size: the size of the physical underlying device, in MiB
1085   @type owner: str
1086   @param owner: the name of the instance for which disk is created,
1087       used for device cache data
1088   @type on_primary: boolean
1089   @param on_primary:  indicates if it is the primary node or not
1090   @type info: string
1091   @param info: string that will be sent to the physical device
1092       creation, used for example to set (LVM) tags on LVs
1093
1094   @return: the new unique_id of the device (this can sometime be
1095       computed only after creation), or None. On secondary nodes,
1096       it's not required to return anything.
1097
1098   """
1099   clist = []
1100   if disk.children:
1101     for child in disk.children:
1102       try:
1103         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1104       except errors.BlockDeviceError, err:
1105         errmsg = "Can't assemble device %s: %s" % (child, err)
1106         logging.error(errmsg)
1107         return False, errmsg
1108       if on_primary or disk.AssembleOnSecondary():
1109         # we need the children open in case the device itself has to
1110         # be assembled
1111         try:
1112           crdev.Open()
1113         except errors.BlockDeviceError, err:
1114           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1115           logging.error(errmsg)
1116           return False, errmsg
1117       clist.append(crdev)
1118
1119   try:
1120     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1121   except errors.BlockDeviceError, err:
1122     return False, "Can't create block device: %s" % str(err)
1123
1124   if on_primary or disk.AssembleOnSecondary():
1125     try:
1126       device.Assemble()
1127     except errors.BlockDeviceError, err:
1128       errmsg = ("Can't assemble device after creation, very"
1129                 " unusual event: %s" % str(err))
1130       logging.error(errmsg)
1131       return False, errmsg
1132     device.SetSyncSpeed(constants.SYNC_SPEED)
1133     if on_primary or disk.OpenOnSecondary():
1134       try:
1135         device.Open(force=True)
1136       except errors.BlockDeviceError, err:
1137         errmsg = ("Can't make device r/w after creation, very"
1138                   " unusual event: %s" % str(err))
1139         logging.error(errmsg)
1140         return False, errmsg
1141     DevCacheManager.UpdateCache(device.dev_path, owner,
1142                                 on_primary, disk.iv_name)
1143
1144   device.SetInfo(info)
1145
1146   physical_id = device.unique_id
1147   return True, physical_id
1148
1149
1150 def BlockdevRemove(disk):
1151   """Remove a block device.
1152
1153   @note: This is intended to be called recursively.
1154
1155   @type disk: L{objects.Disk}
1156   @param disk: the disk object we should remove
1157   @rtype: boolean
1158   @return: the success of the operation
1159
1160   """
1161   msgs = []
1162   result = True
1163   try:
1164     rdev = _RecursiveFindBD(disk)
1165   except errors.BlockDeviceError, err:
1166     # probably can't attach
1167     logging.info("Can't attach to device %s in remove", disk)
1168     rdev = None
1169   if rdev is not None:
1170     r_path = rdev.dev_path
1171     try:
1172       rdev.Remove()
1173     except errors.BlockDeviceError, err:
1174       msgs.append(str(err))
1175       result = False
1176     if result:
1177       DevCacheManager.RemoveCache(r_path)
1178
1179   if disk.children:
1180     for child in disk.children:
1181       c_status, c_msg = BlockdevRemove(child)
1182       result = result and c_status
1183       if c_msg: # not an empty message
1184         msgs.append(c_msg)
1185
1186   return (result, "; ".join(msgs))
1187
1188
1189 def _RecursiveAssembleBD(disk, owner, as_primary):
1190   """Activate a block device for an instance.
1191
1192   This is run on the primary and secondary nodes for an instance.
1193
1194   @note: this function is called recursively.
1195
1196   @type disk: L{objects.Disk}
1197   @param disk: the disk we try to assemble
1198   @type owner: str
1199   @param owner: the name of the instance which owns the disk
1200   @type as_primary: boolean
1201   @param as_primary: if we should make the block device
1202       read/write
1203
1204   @return: the assembled device or None (in case no device
1205       was assembled)
1206   @raise errors.BlockDeviceError: in case there is an error
1207       during the activation of the children or the device
1208       itself
1209
1210   """
1211   children = []
1212   if disk.children:
1213     mcn = disk.ChildrenNeeded()
1214     if mcn == -1:
1215       mcn = 0 # max number of Nones allowed
1216     else:
1217       mcn = len(disk.children) - mcn # max number of Nones
1218     for chld_disk in disk.children:
1219       try:
1220         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1221       except errors.BlockDeviceError, err:
1222         if children.count(None) >= mcn:
1223           raise
1224         cdev = None
1225         logging.error("Error in child activation (but continuing): %s",
1226                       str(err))
1227       children.append(cdev)
1228
1229   if as_primary or disk.AssembleOnSecondary():
1230     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1231     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1232     result = r_dev
1233     if as_primary or disk.OpenOnSecondary():
1234       r_dev.Open()
1235     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1236                                 as_primary, disk.iv_name)
1237
1238   else:
1239     result = True
1240   return result
1241
1242
1243 def BlockdevAssemble(disk, owner, as_primary):
1244   """Activate a block device for an instance.
1245
1246   This is a wrapper over _RecursiveAssembleBD.
1247
1248   @rtype: str or boolean
1249   @return: a C{/dev/...} path for primary nodes, and
1250       C{True} for secondary nodes
1251
1252   """
1253   status = True
1254   result = "no error information"
1255   try:
1256     result = _RecursiveAssembleBD(disk, owner, as_primary)
1257     if isinstance(result, bdev.BlockDev):
1258       result = result.dev_path
1259   except errors.BlockDeviceError, err:
1260     result = "Error while assembling disk: %s" % str(err)
1261     status = False
1262   return (status, result)
1263
1264
1265 def BlockdevShutdown(disk):
1266   """Shut down a block device.
1267
1268   First, if the device is assembled (Attach() is successfull), then
1269   the device is shutdown. Then the children of the device are
1270   shutdown.
1271
1272   This function is called recursively. Note that we don't cache the
1273   children or such, as oppossed to assemble, shutdown of different
1274   devices doesn't require that the upper device was active.
1275
1276   @type disk: L{objects.Disk}
1277   @param disk: the description of the disk we should
1278       shutdown
1279   @rtype: boolean
1280   @return: the success of the operation
1281
1282   """
1283   msgs = []
1284   result = True
1285   r_dev = _RecursiveFindBD(disk)
1286   if r_dev is not None:
1287     r_path = r_dev.dev_path
1288     try:
1289       r_dev.Shutdown()
1290       DevCacheManager.RemoveCache(r_path)
1291     except errors.BlockDeviceError, err:
1292       msgs.append(str(err))
1293       result = False
1294
1295   if disk.children:
1296     for child in disk.children:
1297       c_status, c_msg = BlockdevShutdown(child)
1298       result = result and c_status
1299       if c_msg: # not an empty message
1300         msgs.append(c_msg)
1301
1302   return (result, "; ".join(msgs))
1303
1304
1305 def BlockdevAddchildren(parent_cdev, new_cdevs):
1306   """Extend a mirrored block device.
1307
1308   @type parent_cdev: L{objects.Disk}
1309   @param parent_cdev: the disk to which we should add children
1310   @type new_cdevs: list of L{objects.Disk}
1311   @param new_cdevs: the list of children which we should add
1312   @rtype: boolean
1313   @return: the success of the operation
1314
1315   """
1316   parent_bdev = _RecursiveFindBD(parent_cdev)
1317   if parent_bdev is None:
1318     logging.error("Can't find parent device")
1319     return False
1320   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1321   if new_bdevs.count(None) > 0:
1322     logging.error("Can't find new device(s) to add: %s:%s",
1323                   new_bdevs, new_cdevs)
1324     return False
1325   parent_bdev.AddChildren(new_bdevs)
1326   return True
1327
1328
1329 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1330   """Shrink a mirrored block device.
1331
1332   @type parent_cdev: L{objects.Disk}
1333   @param parent_cdev: the disk from which we should remove children
1334   @type new_cdevs: list of L{objects.Disk}
1335   @param new_cdevs: the list of children which we should remove
1336   @rtype: boolean
1337   @return: the success of the operation
1338
1339   """
1340   parent_bdev = _RecursiveFindBD(parent_cdev)
1341   if parent_bdev is None:
1342     logging.error("Can't find parent in remove children: %s", parent_cdev)
1343     return False
1344   devs = []
1345   for disk in new_cdevs:
1346     rpath = disk.StaticDevPath()
1347     if rpath is None:
1348       bd = _RecursiveFindBD(disk)
1349       if bd is None:
1350         logging.error("Can't find dynamic device %s while removing children",
1351                       disk)
1352         return False
1353       else:
1354         devs.append(bd.dev_path)
1355     else:
1356       devs.append(rpath)
1357   parent_bdev.RemoveChildren(devs)
1358   return True
1359
1360
1361 def BlockdevGetmirrorstatus(disks):
1362   """Get the mirroring status of a list of devices.
1363
1364   @type disks: list of L{objects.Disk}
1365   @param disks: the list of disks which we should query
1366   @rtype: disk
1367   @return:
1368       a list of (mirror_done, estimated_time) tuples, which
1369       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1370   @raise errors.BlockDeviceError: if any of the disks cannot be
1371       found
1372
1373   """
1374   stats = []
1375   for dsk in disks:
1376     rbd = _RecursiveFindBD(dsk)
1377     if rbd is None:
1378       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1379     stats.append(rbd.CombinedSyncStatus())
1380   return stats
1381
1382
1383 def _RecursiveFindBD(disk):
1384   """Check if a device is activated.
1385
1386   If so, return informations about the real device.
1387
1388   @type disk: L{objects.Disk}
1389   @param disk: the disk object we need to find
1390
1391   @return: None if the device can't be found,
1392       otherwise the device instance
1393
1394   """
1395   children = []
1396   if disk.children:
1397     for chdisk in disk.children:
1398       children.append(_RecursiveFindBD(chdisk))
1399
1400   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1401
1402
1403 def BlockdevFind(disk):
1404   """Check if a device is activated.
1405
1406   If it is, return informations about the real device.
1407
1408   @type disk: L{objects.Disk}
1409   @param disk: the disk to find
1410   @rtype: None or tuple
1411   @return: None if the disk cannot be found, otherwise a
1412       tuple (device_path, major, minor, sync_percent,
1413       estimated_time, is_degraded)
1414
1415   """
1416   try:
1417     rbd = _RecursiveFindBD(disk)
1418   except errors.BlockDeviceError, err:
1419     return (False, str(err))
1420   if rbd is None:
1421     return (True, None)
1422   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1423
1424
1425 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1426   """Write a file to the filesystem.
1427
1428   This allows the master to overwrite(!) a file. It will only perform
1429   the operation if the file belongs to a list of configuration files.
1430
1431   @type file_name: str
1432   @param file_name: the target file name
1433   @type data: str
1434   @param data: the new contents of the file
1435   @type mode: int
1436   @param mode: the mode to give the file (can be None)
1437   @type uid: int
1438   @param uid: the owner of the file (can be -1 for default)
1439   @type gid: int
1440   @param gid: the group of the file (can be -1 for default)
1441   @type atime: float
1442   @param atime: the atime to set on the file (can be None)
1443   @type mtime: float
1444   @param mtime: the mtime to set on the file (can be None)
1445   @rtype: boolean
1446   @return: the success of the operation; errors are logged
1447       in the node daemon log
1448
1449   """
1450   if not os.path.isabs(file_name):
1451     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1452                   file_name)
1453     return False
1454
1455   allowed_files = [
1456     constants.CLUSTER_CONF_FILE,
1457     constants.ETC_HOSTS,
1458     constants.SSH_KNOWN_HOSTS_FILE,
1459     constants.VNC_PASSWORD_FILE,
1460     ]
1461
1462   if file_name not in allowed_files:
1463     logging.error("Filename passed to UploadFile not in allowed"
1464                  " upload targets: '%s'", file_name)
1465     return False
1466
1467   raw_data = _Decompress(data)
1468
1469   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1470                   atime=atime, mtime=mtime)
1471   return True
1472
1473
1474 def WriteSsconfFiles(values):
1475   """Update all ssconf files.
1476
1477   Wrapper around the SimpleStore.WriteFiles.
1478
1479   """
1480   ssconf.SimpleStore().WriteFiles(values)
1481
1482
1483 def _ErrnoOrStr(err):
1484   """Format an EnvironmentError exception.
1485
1486   If the L{err} argument has an errno attribute, it will be looked up
1487   and converted into a textual C{E...} description. Otherwise the
1488   string representation of the error will be returned.
1489
1490   @type err: L{EnvironmentError}
1491   @param err: the exception to format
1492
1493   """
1494   if hasattr(err, 'errno'):
1495     detail = errno.errorcode[err.errno]
1496   else:
1497     detail = str(err)
1498   return detail
1499
1500
1501 def _OSOndiskVersion(name, os_dir):
1502   """Compute and return the API version of a given OS.
1503
1504   This function will try to read the API version of the OS given by
1505   the 'name' parameter and residing in the 'os_dir' directory.
1506
1507   @type name: str
1508   @param name: the OS name we should look for
1509   @type os_dir: str
1510   @param os_dir: the directory inwhich we should look for the OS
1511   @rtype: int or None
1512   @return:
1513       Either an integer denoting the version or None in the
1514       case when this is not a valid OS name.
1515   @raise errors.InvalidOS: if the OS cannot be found
1516
1517   """
1518   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1519
1520   try:
1521     st = os.stat(api_file)
1522   except EnvironmentError, err:
1523     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1524                            " found (%s)" % _ErrnoOrStr(err))
1525
1526   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1527     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1528                            " a regular file")
1529
1530   try:
1531     f = open(api_file)
1532     try:
1533       api_versions = f.readlines()
1534     finally:
1535       f.close()
1536   except EnvironmentError, err:
1537     raise errors.InvalidOS(name, os_dir, "error while reading the"
1538                            " API version (%s)" % _ErrnoOrStr(err))
1539
1540   api_versions = [version.strip() for version in api_versions]
1541   try:
1542     api_versions = [int(version) for version in api_versions]
1543   except (TypeError, ValueError), err:
1544     raise errors.InvalidOS(name, os_dir,
1545                            "API version is not integer (%s)" % str(err))
1546
1547   return api_versions
1548
1549
1550 def DiagnoseOS(top_dirs=None):
1551   """Compute the validity for all OSes.
1552
1553   @type top_dirs: list
1554   @param top_dirs: the list of directories in which to
1555       search (if not given defaults to
1556       L{constants.OS_SEARCH_PATH})
1557   @rtype: list of L{objects.OS}
1558   @return: an OS object for each name in all the given
1559       directories
1560
1561   """
1562   if top_dirs is None:
1563     top_dirs = constants.OS_SEARCH_PATH
1564
1565   result = []
1566   for dir_name in top_dirs:
1567     if os.path.isdir(dir_name):
1568       try:
1569         f_names = utils.ListVisibleFiles(dir_name)
1570       except EnvironmentError, err:
1571         logging.exception("Can't list the OS directory %s", dir_name)
1572         break
1573       for name in f_names:
1574         try:
1575           os_inst = OSFromDisk(name, base_dir=dir_name)
1576           result.append(os_inst)
1577         except errors.InvalidOS, err:
1578           result.append(objects.OS.FromInvalidOS(err))
1579
1580   return result
1581
1582
1583 def OSFromDisk(name, base_dir=None):
1584   """Create an OS instance from disk.
1585
1586   This function will return an OS instance if the given name is a
1587   valid OS name. Otherwise, it will raise an appropriate
1588   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1589
1590   @type base_dir: string
1591   @keyword base_dir: Base directory containing OS installations.
1592                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1593   @rtype: L{objects.OS}
1594   @return: the OS instance if we find a valid one
1595   @raise errors.InvalidOS: if we don't find a valid OS
1596
1597   """
1598   if base_dir is None:
1599     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1600     if os_dir is None:
1601       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1602   else:
1603     os_dir = os.path.sep.join([base_dir, name])
1604
1605   api_versions = _OSOndiskVersion(name, os_dir)
1606
1607   if constants.OS_API_VERSION not in api_versions:
1608     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1609                            " (found %s want %s)"
1610                            % (api_versions, constants.OS_API_VERSION))
1611
1612   # OS Scripts dictionary, we will populate it with the actual script names
1613   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1614
1615   for script in os_scripts:
1616     os_scripts[script] = os.path.sep.join([os_dir, script])
1617
1618     try:
1619       st = os.stat(os_scripts[script])
1620     except EnvironmentError, err:
1621       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1622                              (script, _ErrnoOrStr(err)))
1623
1624     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1625       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1626                              script)
1627
1628     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1629       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1630                              script)
1631
1632
1633   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1634                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1635                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1636                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1637                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1638                     api_versions=api_versions)
1639
1640 def OSEnvironment(instance, debug=0):
1641   """Calculate the environment for an os script.
1642
1643   @type instance: L{objects.Instance}
1644   @param instance: target instance for the os script run
1645   @type debug: integer
1646   @param debug: debug level (0 or 1, for OS Api 10)
1647   @rtype: dict
1648   @return: dict of environment variables
1649   @raise errors.BlockDeviceError: if the block device
1650       cannot be found
1651
1652   """
1653   result = {}
1654   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1655   result['INSTANCE_NAME'] = instance.name
1656   result['INSTANCE_OS'] = instance.os
1657   result['HYPERVISOR'] = instance.hypervisor
1658   result['DISK_COUNT'] = '%d' % len(instance.disks)
1659   result['NIC_COUNT'] = '%d' % len(instance.nics)
1660   result['DEBUG_LEVEL'] = '%d' % debug
1661   for idx, disk in enumerate(instance.disks):
1662     real_disk = _RecursiveFindBD(disk)
1663     if real_disk is None:
1664       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1665                                     str(disk))
1666     real_disk.Open()
1667     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1668     result['DISK_%d_ACCESS' % idx] = disk.mode
1669     if constants.HV_DISK_TYPE in instance.hvparams:
1670       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1671         instance.hvparams[constants.HV_DISK_TYPE]
1672     if disk.dev_type in constants.LDS_BLOCK:
1673       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1674     elif disk.dev_type == constants.LD_FILE:
1675       result['DISK_%d_BACKEND_TYPE' % idx] = \
1676         'file:%s' % disk.physical_id[0]
1677   for idx, nic in enumerate(instance.nics):
1678     result['NIC_%d_MAC' % idx] = nic.mac
1679     if nic.ip:
1680       result['NIC_%d_IP' % idx] = nic.ip
1681     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1682     if constants.HV_NIC_TYPE in instance.hvparams:
1683       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1684         instance.hvparams[constants.HV_NIC_TYPE]
1685
1686   return result
1687
1688 def BlockdevGrow(disk, amount):
1689   """Grow a stack of block devices.
1690
1691   This function is called recursively, with the childrens being the
1692   first ones to resize.
1693
1694   @type disk: L{objects.Disk}
1695   @param disk: the disk to be grown
1696   @rtype: (status, result)
1697   @return: a tuple with the status of the operation
1698       (True/False), and the errors message if status
1699       is False
1700
1701   """
1702   r_dev = _RecursiveFindBD(disk)
1703   if r_dev is None:
1704     return False, "Cannot find block device %s" % (disk,)
1705
1706   try:
1707     r_dev.Grow(amount)
1708   except errors.BlockDeviceError, err:
1709     return False, str(err)
1710
1711   return True, None
1712
1713
1714 def BlockdevSnapshot(disk):
1715   """Create a snapshot copy of a block device.
1716
1717   This function is called recursively, and the snapshot is actually created
1718   just for the leaf lvm backend device.
1719
1720   @type disk: L{objects.Disk}
1721   @param disk: the disk to be snapshotted
1722   @rtype: string
1723   @return: snapshot disk path
1724
1725   """
1726   if disk.children:
1727     if len(disk.children) == 1:
1728       # only one child, let's recurse on it
1729       return BlockdevSnapshot(disk.children[0])
1730     else:
1731       # more than one child, choose one that matches
1732       for child in disk.children:
1733         if child.size == disk.size:
1734           # return implies breaking the loop
1735           return BlockdevSnapshot(child)
1736   elif disk.dev_type == constants.LD_LV:
1737     r_dev = _RecursiveFindBD(disk)
1738     if r_dev is not None:
1739       # let's stay on the safe side and ask for the full size, for now
1740       return r_dev.Snapshot(disk.size)
1741     else:
1742       return None
1743   else:
1744     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1745                                  " '%s' of type '%s'" %
1746                                  (disk.unique_id, disk.dev_type))
1747
1748
1749 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1750   """Export a block device snapshot to a remote node.
1751
1752   @type disk: L{objects.Disk}
1753   @param disk: the description of the disk to export
1754   @type dest_node: str
1755   @param dest_node: the destination node to export to
1756   @type instance: L{objects.Instance}
1757   @param instance: the instance object to whom the disk belongs
1758   @type cluster_name: str
1759   @param cluster_name: the cluster name, needed for SSH hostalias
1760   @type idx: int
1761   @param idx: the index of the disk in the instance's disk list,
1762       used to export to the OS scripts environment
1763   @rtype: boolean
1764   @return: the success of the operation
1765
1766   """
1767   export_env = OSEnvironment(instance)
1768
1769   inst_os = OSFromDisk(instance.os)
1770   export_script = inst_os.export_script
1771
1772   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1773                                      instance.name, int(time.time()))
1774   if not os.path.exists(constants.LOG_OS_DIR):
1775     os.mkdir(constants.LOG_OS_DIR, 0750)
1776   real_disk = _RecursiveFindBD(disk)
1777   if real_disk is None:
1778     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1779                                   str(disk))
1780   real_disk.Open()
1781
1782   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1783   export_env['EXPORT_INDEX'] = str(idx)
1784
1785   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1786   destfile = disk.physical_id[1]
1787
1788   # the target command is built out of three individual commands,
1789   # which are joined by pipes; we check each individual command for
1790   # valid parameters
1791   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1792                                export_script, logfile)
1793
1794   comprcmd = "gzip"
1795
1796   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1797                                 destdir, destdir, destfile)
1798   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1799                                                    constants.GANETI_RUNAS,
1800                                                    destcmd)
1801
1802   # all commands have been checked, so we're safe to combine them
1803   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1804
1805   result = utils.RunCmd(command, env=export_env)
1806
1807   if result.failed:
1808     logging.error("os snapshot export command '%s' returned error: %s"
1809                   " output: %s", command, result.fail_reason, result.output)
1810     return False
1811
1812   return True
1813
1814
1815 def FinalizeExport(instance, snap_disks):
1816   """Write out the export configuration information.
1817
1818   @type instance: L{objects.Instance}
1819   @param instance: the instance which we export, used for
1820       saving configuration
1821   @type snap_disks: list of L{objects.Disk}
1822   @param snap_disks: list of snapshot block devices, which
1823       will be used to get the actual name of the dump file
1824
1825   @rtype: boolean
1826   @return: the success of the operation
1827
1828   """
1829   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1830   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1831
1832   config = objects.SerializableConfigParser()
1833
1834   config.add_section(constants.INISECT_EXP)
1835   config.set(constants.INISECT_EXP, 'version', '0')
1836   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1837   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1838   config.set(constants.INISECT_EXP, 'os', instance.os)
1839   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1840
1841   config.add_section(constants.INISECT_INS)
1842   config.set(constants.INISECT_INS, 'name', instance.name)
1843   config.set(constants.INISECT_INS, 'memory', '%d' %
1844              instance.beparams[constants.BE_MEMORY])
1845   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1846              instance.beparams[constants.BE_VCPUS])
1847   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1848
1849   nic_total = 0
1850   for nic_count, nic in enumerate(instance.nics):
1851     nic_total += 1
1852     config.set(constants.INISECT_INS, 'nic%d_mac' %
1853                nic_count, '%s' % nic.mac)
1854     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1855     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1856                '%s' % nic.bridge)
1857   # TODO: redundant: on load can read nics until it doesn't exist
1858   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1859
1860   disk_total = 0
1861   for disk_count, disk in enumerate(snap_disks):
1862     if disk:
1863       disk_total += 1
1864       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1865                  ('%s' % disk.iv_name))
1866       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1867                  ('%s' % disk.physical_id[1]))
1868       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1869                  ('%d' % disk.size))
1870
1871   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1872
1873   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1874                   data=config.Dumps())
1875   shutil.rmtree(finaldestdir, True)
1876   shutil.move(destdir, finaldestdir)
1877
1878   return True
1879
1880
1881 def ExportInfo(dest):
1882   """Get export configuration information.
1883
1884   @type dest: str
1885   @param dest: directory containing the export
1886
1887   @rtype: L{objects.SerializableConfigParser}
1888   @return: a serializable config file containing the
1889       export info
1890
1891   """
1892   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1893
1894   config = objects.SerializableConfigParser()
1895   config.read(cff)
1896
1897   if (not config.has_section(constants.INISECT_EXP) or
1898       not config.has_section(constants.INISECT_INS)):
1899     return None
1900
1901   return config
1902
1903
1904 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1905   """Import an os image into an instance.
1906
1907   @type instance: L{objects.Instance}
1908   @param instance: instance to import the disks into
1909   @type src_node: string
1910   @param src_node: source node for the disk images
1911   @type src_images: list of string
1912   @param src_images: absolute paths of the disk images
1913   @rtype: list of boolean
1914   @return: each boolean represent the success of importing the n-th disk
1915
1916   """
1917   import_env = OSEnvironment(instance)
1918   inst_os = OSFromDisk(instance.os)
1919   import_script = inst_os.import_script
1920
1921   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1922                                         instance.name, int(time.time()))
1923   if not os.path.exists(constants.LOG_OS_DIR):
1924     os.mkdir(constants.LOG_OS_DIR, 0750)
1925
1926   comprcmd = "gunzip"
1927   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1928                                import_script, logfile)
1929
1930   final_result = []
1931   for idx, image in enumerate(src_images):
1932     if image:
1933       destcmd = utils.BuildShellCmd('cat %s', image)
1934       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1935                                                        constants.GANETI_RUNAS,
1936                                                        destcmd)
1937       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1938       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1939       import_env['IMPORT_INDEX'] = str(idx)
1940       result = utils.RunCmd(command, env=import_env)
1941       if result.failed:
1942         logging.error("Disk import command '%s' returned error: %s"
1943                       " output: %s", command, result.fail_reason,
1944                       result.output)
1945         final_result.append(False)
1946       else:
1947         final_result.append(True)
1948     else:
1949       final_result.append(True)
1950
1951   return final_result
1952
1953
1954 def ListExports():
1955   """Return a list of exports currently available on this machine.
1956
1957   @rtype: list
1958   @return: list of the exports
1959
1960   """
1961   if os.path.isdir(constants.EXPORT_DIR):
1962     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1963   else:
1964     return []
1965
1966
1967 def RemoveExport(export):
1968   """Remove an existing export from the node.
1969
1970   @type export: str
1971   @param export: the name of the export to remove
1972   @rtype: boolean
1973   @return: the success of the operation
1974
1975   """
1976   target = os.path.join(constants.EXPORT_DIR, export)
1977
1978   shutil.rmtree(target)
1979   # TODO: catch some of the relevant exceptions and provide a pretty
1980   # error message if rmtree fails.
1981
1982   return True
1983
1984
1985 def BlockdevRename(devlist):
1986   """Rename a list of block devices.
1987
1988   @type devlist: list of tuples
1989   @param devlist: list of tuples of the form  (disk,
1990       new_logical_id, new_physical_id); disk is an
1991       L{objects.Disk} object describing the current disk,
1992       and new logical_id/physical_id is the name we
1993       rename it to
1994   @rtype: boolean
1995   @return: True if all renames succeeded, False otherwise
1996
1997   """
1998   result = True
1999   for disk, unique_id in devlist:
2000     dev = _RecursiveFindBD(disk)
2001     if dev is None:
2002       result = False
2003       continue
2004     try:
2005       old_rpath = dev.dev_path
2006       dev.Rename(unique_id)
2007       new_rpath = dev.dev_path
2008       if old_rpath != new_rpath:
2009         DevCacheManager.RemoveCache(old_rpath)
2010         # FIXME: we should add the new cache information here, like:
2011         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2012         # but we don't have the owner here - maybe parse from existing
2013         # cache? for now, we only lose lvm data when we rename, which
2014         # is less critical than DRBD or MD
2015     except errors.BlockDeviceError, err:
2016       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2017       result = False
2018   return result
2019
2020
2021 def _TransformFileStorageDir(file_storage_dir):
2022   """Checks whether given file_storage_dir is valid.
2023
2024   Checks wheter the given file_storage_dir is within the cluster-wide
2025   default file_storage_dir stored in SimpleStore. Only paths under that
2026   directory are allowed.
2027
2028   @type file_storage_dir: str
2029   @param file_storage_dir: the path to check
2030
2031   @return: the normalized path if valid, None otherwise
2032
2033   """
2034   cfg = _GetConfig()
2035   file_storage_dir = os.path.normpath(file_storage_dir)
2036   base_file_storage_dir = cfg.GetFileStorageDir()
2037   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2038       base_file_storage_dir):
2039     logging.error("file storage directory '%s' is not under base file"
2040                   " storage directory '%s'",
2041                   file_storage_dir, base_file_storage_dir)
2042     return None
2043   return file_storage_dir
2044
2045
2046 def CreateFileStorageDir(file_storage_dir):
2047   """Create file storage directory.
2048
2049   @type file_storage_dir: str
2050   @param file_storage_dir: directory to create
2051
2052   @rtype: tuple
2053   @return: tuple with first element a boolean indicating wheter dir
2054       creation was successful or not
2055
2056   """
2057   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2058   result = True,
2059   if not file_storage_dir:
2060     result = False,
2061   else:
2062     if os.path.exists(file_storage_dir):
2063       if not os.path.isdir(file_storage_dir):
2064         logging.error("'%s' is not a directory", file_storage_dir)
2065         result = False,
2066     else:
2067       try:
2068         os.makedirs(file_storage_dir, 0750)
2069       except OSError, err:
2070         logging.error("Cannot create file storage directory '%s': %s",
2071                       file_storage_dir, err)
2072         result = False,
2073   return result
2074
2075
2076 def RemoveFileStorageDir(file_storage_dir):
2077   """Remove file storage directory.
2078
2079   Remove it only if it's empty. If not log an error and return.
2080
2081   @type file_storage_dir: str
2082   @param file_storage_dir: the directory we should cleanup
2083   @rtype: tuple (success,)
2084   @return: tuple of one element, C{success}, denoting
2085       whether the operation was successfull
2086
2087   """
2088   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2089   result = True,
2090   if not file_storage_dir:
2091     result = False,
2092   else:
2093     if os.path.exists(file_storage_dir):
2094       if not os.path.isdir(file_storage_dir):
2095         logging.error("'%s' is not a directory", file_storage_dir)
2096         result = False,
2097       # deletes dir only if empty, otherwise we want to return False
2098       try:
2099         os.rmdir(file_storage_dir)
2100       except OSError, err:
2101         logging.exception("Cannot remove file storage directory '%s'",
2102                           file_storage_dir)
2103         result = False,
2104   return result
2105
2106
2107 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2108   """Rename the file storage directory.
2109
2110   @type old_file_storage_dir: str
2111   @param old_file_storage_dir: the current path
2112   @type new_file_storage_dir: str
2113   @param new_file_storage_dir: the name we should rename to
2114   @rtype: tuple (success,)
2115   @return: tuple of one element, C{success}, denoting
2116       whether the operation was successful
2117
2118   """
2119   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2120   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2121   result = True,
2122   if not old_file_storage_dir or not new_file_storage_dir:
2123     result = False,
2124   else:
2125     if not os.path.exists(new_file_storage_dir):
2126       if os.path.isdir(old_file_storage_dir):
2127         try:
2128           os.rename(old_file_storage_dir, new_file_storage_dir)
2129         except OSError, err:
2130           logging.exception("Cannot rename '%s' to '%s'",
2131                             old_file_storage_dir, new_file_storage_dir)
2132           result =  False,
2133       else:
2134         logging.error("'%s' is not a directory", old_file_storage_dir)
2135         result = False,
2136     else:
2137       if os.path.exists(old_file_storage_dir):
2138         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2139                       old_file_storage_dir, new_file_storage_dir)
2140         result = False,
2141   return result
2142
2143
2144 def _IsJobQueueFile(file_name):
2145   """Checks whether the given filename is in the queue directory.
2146
2147   @type file_name: str
2148   @param file_name: the file name we should check
2149   @rtype: boolean
2150   @return: whether the file is under the queue directory
2151
2152   """
2153   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2154   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2155
2156   if not result:
2157     logging.error("'%s' is not a file in the queue directory",
2158                   file_name)
2159
2160   return result
2161
2162
2163 def JobQueueUpdate(file_name, content):
2164   """Updates a file in the queue directory.
2165
2166   This is just a wrapper over L{utils.WriteFile}, with proper
2167   checking.
2168
2169   @type file_name: str
2170   @param file_name: the job file name
2171   @type content: str
2172   @param content: the new job contents
2173   @rtype: boolean
2174   @return: the success of the operation
2175
2176   """
2177   if not _IsJobQueueFile(file_name):
2178     return False
2179
2180   # Write and replace the file atomically
2181   utils.WriteFile(file_name, data=_Decompress(content))
2182
2183   return True
2184
2185
2186 def JobQueueRename(old, new):
2187   """Renames a job queue file.
2188
2189   This is just a wrapper over os.rename with proper checking.
2190
2191   @type old: str
2192   @param old: the old (actual) file name
2193   @type new: str
2194   @param new: the desired file name
2195   @rtype: boolean
2196   @return: the success of the operation
2197
2198   """
2199   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2200     return False
2201
2202   utils.RenameFile(old, new, mkdir=True)
2203
2204   return True
2205
2206
2207 def JobQueueSetDrainFlag(drain_flag):
2208   """Set the drain flag for the queue.
2209
2210   This will set or unset the queue drain flag.
2211
2212   @type drain_flag: boolean
2213   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2214   @rtype: boolean
2215   @return: always True
2216   @warning: the function always returns True
2217
2218   """
2219   if drain_flag:
2220     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2221   else:
2222     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2223
2224   return True
2225
2226
2227 def BlockdevClose(instance_name, disks):
2228   """Closes the given block devices.
2229
2230   This means they will be switched to secondary mode (in case of
2231   DRBD).
2232
2233   @param instance_name: if the argument is not empty, the symlinks
2234       of this instance will be removed
2235   @type disks: list of L{objects.Disk}
2236   @param disks: the list of disks to be closed
2237   @rtype: tuple (success, message)
2238   @return: a tuple of success and message, where success
2239       indicates the succes of the operation, and message
2240       which will contain the error details in case we
2241       failed
2242
2243   """
2244   bdevs = []
2245   for cf in disks:
2246     rd = _RecursiveFindBD(cf)
2247     if rd is None:
2248       return (False, "Can't find device %s" % cf)
2249     bdevs.append(rd)
2250
2251   msg = []
2252   for rd in bdevs:
2253     try:
2254       rd.Close()
2255     except errors.BlockDeviceError, err:
2256       msg.append(str(err))
2257   if msg:
2258     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2259   else:
2260     if instance_name:
2261       _RemoveBlockDevLinks(instance_name, disks)
2262     return (True, "All devices secondary")
2263
2264
2265 def ValidateHVParams(hvname, hvparams):
2266   """Validates the given hypervisor parameters.
2267
2268   @type hvname: string
2269   @param hvname: the hypervisor name
2270   @type hvparams: dict
2271   @param hvparams: the hypervisor parameters to be validated
2272   @rtype: tuple (success, message)
2273   @return: a tuple of success and message, where success
2274       indicates the succes of the operation, and message
2275       which will contain the error details in case we
2276       failed
2277
2278   """
2279   try:
2280     hv_type = hypervisor.GetHypervisor(hvname)
2281     hv_type.ValidateParameters(hvparams)
2282     return (True, "Validation passed")
2283   except errors.HypervisorError, err:
2284     return (False, str(err))
2285
2286
2287 def DemoteFromMC():
2288   """Demotes the current node from master candidate role.
2289
2290   """
2291   # try to ensure we're not the master by mistake
2292   master, myself = ssconf.GetMasterAndMyself()
2293   if master == myself:
2294     return (False, "ssconf status shows I'm the master node, will not demote")
2295   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2296   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2297     return (False, "The master daemon is running, will not demote")
2298   try:
2299     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2300   except EnvironmentError, err:
2301     if err.errno != errno.ENOENT:
2302       return (False, "Error while backing up cluster file: %s" % str(err))
2303   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2304   return (True, "Done")
2305
2306
2307 def _FindDisks(nodes_ip, disks):
2308   """Sets the physical ID on disks and returns the block devices.
2309
2310   """
2311   # set the correct physical ID
2312   my_name = utils.HostInfo().name
2313   for cf in disks:
2314     cf.SetPhysicalID(my_name, nodes_ip)
2315
2316   bdevs = []
2317
2318   for cf in disks:
2319     rd = _RecursiveFindBD(cf)
2320     if rd is None:
2321       return (False, "Can't find device %s" % cf)
2322     bdevs.append(rd)
2323   return (True, bdevs)
2324
2325
2326 def DrbdDisconnectNet(nodes_ip, disks):
2327   """Disconnects the network on a list of drbd devices.
2328
2329   """
2330   status, bdevs = _FindDisks(nodes_ip, disks)
2331   if not status:
2332     return status, bdevs
2333
2334   # disconnect disks
2335   for rd in bdevs:
2336     try:
2337       rd.DisconnectNet()
2338     except errors.BlockDeviceError, err:
2339       logging.exception("Failed to go into standalone mode")
2340       return (False, "Can't change network configuration: %s" % str(err))
2341   return (True, "All disks are now disconnected")
2342
2343
2344 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2345   """Attaches the network on a list of drbd devices.
2346
2347   """
2348   status, bdevs = _FindDisks(nodes_ip, disks)
2349   if not status:
2350     return status, bdevs
2351
2352   if multimaster:
2353     for idx, rd in enumerate(bdevs):
2354       try:
2355         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2356       except EnvironmentError, err:
2357         return (False, "Can't create symlink: %s" % str(err))
2358   # reconnect disks, switch to new master configuration and if
2359   # needed primary mode
2360   for rd in bdevs:
2361     try:
2362       rd.AttachNet(multimaster)
2363     except errors.BlockDeviceError, err:
2364       return (False, "Can't change network configuration: %s" % str(err))
2365   # wait until the disks are connected; we need to retry the re-attach
2366   # if the device becomes standalone, as this might happen if the one
2367   # node disconnects and reconnects in a different mode before the
2368   # other node reconnects; in this case, one or both of the nodes will
2369   # decide it has wrong configuration and switch to standalone
2370   RECONNECT_TIMEOUT = 2 * 60
2371   sleep_time = 0.100 # start with 100 miliseconds
2372   timeout_limit = time.time() + RECONNECT_TIMEOUT
2373   while time.time() < timeout_limit:
2374     all_connected = True
2375     for rd in bdevs:
2376       stats = rd.GetProcStatus()
2377       if not (stats.is_connected or stats.is_in_resync):
2378         all_connected = False
2379       if stats.is_standalone:
2380         # peer had different config info and this node became
2381         # standalone, even though this should not happen with the
2382         # new staged way of changing disk configs
2383         try:
2384           rd.ReAttachNet(multimaster)
2385         except errors.BlockDeviceError, err:
2386           return (False, "Can't change network configuration: %s" % str(err))
2387     if all_connected:
2388       break
2389     time.sleep(sleep_time)
2390     sleep_time = min(5, sleep_time * 1.5)
2391   if not all_connected:
2392     return (False, "Timeout in disk reconnecting")
2393   if multimaster:
2394     # change to primary mode
2395     for rd in bdevs:
2396       try:
2397         rd.Open()
2398       except errors.BlockDeviceError, err:
2399         return (False, "Can't change to primary mode: %s" % str(err))
2400   if multimaster:
2401     msg = "multi-master and primary"
2402   else:
2403     msg = "single-master"
2404   return (True, "Disks are now configured as %s" % msg)
2405
2406
2407 def DrbdWaitSync(nodes_ip, disks):
2408   """Wait until DRBDs have synchronized.
2409
2410   """
2411   status, bdevs = _FindDisks(nodes_ip, disks)
2412   if not status:
2413     return status, bdevs
2414
2415   min_resync = 100
2416   alldone = True
2417   failure = False
2418   for rd in bdevs:
2419     stats = rd.GetProcStatus()
2420     if not (stats.is_connected or stats.is_in_resync):
2421       failure = True
2422       break
2423     alldone = alldone and (not stats.is_in_resync)
2424     if stats.sync_percent is not None:
2425       min_resync = min(min_resync, stats.sync_percent)
2426   return (not failure, (alldone, min_resync))
2427
2428
2429 class HooksRunner(object):
2430   """Hook runner.
2431
2432   This class is instantiated on the node side (ganeti-noded) and not
2433   on the master side.
2434
2435   """
2436   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2437
2438   def __init__(self, hooks_base_dir=None):
2439     """Constructor for hooks runner.
2440
2441     @type hooks_base_dir: str or None
2442     @param hooks_base_dir: if not None, this overrides the
2443         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2444
2445     """
2446     if hooks_base_dir is None:
2447       hooks_base_dir = constants.HOOKS_BASE_DIR
2448     self._BASE_DIR = hooks_base_dir
2449
2450   @staticmethod
2451   def ExecHook(script, env):
2452     """Exec one hook script.
2453
2454     @type script: str
2455     @param script: the full path to the script
2456     @type env: dict
2457     @param env: the environment with which to exec the script
2458     @rtype: tuple (success, message)
2459     @return: a tuple of success and message, where success
2460         indicates the succes of the operation, and message
2461         which will contain the error details in case we
2462         failed
2463
2464     """
2465     # exec the process using subprocess and log the output
2466     fdstdin = None
2467     try:
2468       fdstdin = open("/dev/null", "r")
2469       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2470                                stderr=subprocess.STDOUT, close_fds=True,
2471                                shell=False, cwd="/", env=env)
2472       output = ""
2473       try:
2474         output = child.stdout.read(4096)
2475         child.stdout.close()
2476       except EnvironmentError, err:
2477         output += "Hook script error: %s" % str(err)
2478
2479       while True:
2480         try:
2481           result = child.wait()
2482           break
2483         except EnvironmentError, err:
2484           if err.errno == errno.EINTR:
2485             continue
2486           raise
2487     finally:
2488       # try not to leak fds
2489       for fd in (fdstdin, ):
2490         if fd is not None:
2491           try:
2492             fd.close()
2493           except EnvironmentError, err:
2494             # just log the error
2495             #logging.exception("Error while closing fd %s", fd)
2496             pass
2497
2498     return result == 0, utils.SafeEncode(output.strip())
2499
2500   def RunHooks(self, hpath, phase, env):
2501     """Run the scripts in the hooks directory.
2502
2503     @type hpath: str
2504     @param hpath: the path to the hooks directory which
2505         holds the scripts
2506     @type phase: str
2507     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2508         L{constants.HOOKS_PHASE_POST}
2509     @type env: dict
2510     @param env: dictionary with the environment for the hook
2511     @rtype: list
2512     @return: list of 3-element tuples:
2513       - script path
2514       - script result, either L{constants.HKR_SUCCESS} or
2515         L{constants.HKR_FAIL}
2516       - output of the script
2517
2518     @raise errors.ProgrammerError: for invalid input
2519         parameters
2520
2521     """
2522     if phase == constants.HOOKS_PHASE_PRE:
2523       suffix = "pre"
2524     elif phase == constants.HOOKS_PHASE_POST:
2525       suffix = "post"
2526     else:
2527       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2528     rr = []
2529
2530     subdir = "%s-%s.d" % (hpath, suffix)
2531     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2532     try:
2533       dir_contents = utils.ListVisibleFiles(dir_name)
2534     except OSError, err:
2535       # FIXME: must log output in case of failures
2536       return rr
2537
2538     # we use the standard python sort order,
2539     # so 00name is the recommended naming scheme
2540     dir_contents.sort()
2541     for relname in dir_contents:
2542       fname = os.path.join(dir_name, relname)
2543       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2544           self.RE_MASK.match(relname) is not None):
2545         rrval = constants.HKR_SKIP
2546         output = ""
2547       else:
2548         result, output = self.ExecHook(fname, env)
2549         if not result:
2550           rrval = constants.HKR_FAIL
2551         else:
2552           rrval = constants.HKR_SUCCESS
2553       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2554
2555     return rr
2556
2557
2558 class IAllocatorRunner(object):
2559   """IAllocator runner.
2560
2561   This class is instantiated on the node side (ganeti-noded) and not on
2562   the master side.
2563
2564   """
2565   def Run(self, name, idata):
2566     """Run an iallocator script.
2567
2568     @type name: str
2569     @param name: the iallocator script name
2570     @type idata: str
2571     @param idata: the allocator input data
2572
2573     @rtype: tuple
2574     @return: four element tuple of:
2575        - run status (one of the IARUN_ constants)
2576        - stdout
2577        - stderr
2578        - fail reason (as from L{utils.RunResult})
2579
2580     """
2581     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2582                                   os.path.isfile)
2583     if alloc_script is None:
2584       return (constants.IARUN_NOTFOUND, None, None, None)
2585
2586     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2587     try:
2588       os.write(fd, idata)
2589       os.close(fd)
2590       result = utils.RunCmd([alloc_script, fin_name])
2591       if result.failed:
2592         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2593                 result.fail_reason)
2594     finally:
2595       os.unlink(fin_name)
2596
2597     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2598
2599
2600 class DevCacheManager(object):
2601   """Simple class for managing a cache of block device information.
2602
2603   """
2604   _DEV_PREFIX = "/dev/"
2605   _ROOT_DIR = constants.BDEV_CACHE_DIR
2606
2607   @classmethod
2608   def _ConvertPath(cls, dev_path):
2609     """Converts a /dev/name path to the cache file name.
2610
2611     This replaces slashes with underscores and strips the /dev
2612     prefix. It then returns the full path to the cache file.
2613
2614     @type dev_path: str
2615     @param dev_path: the C{/dev/} path name
2616     @rtype: str
2617     @return: the converted path name
2618
2619     """
2620     if dev_path.startswith(cls._DEV_PREFIX):
2621       dev_path = dev_path[len(cls._DEV_PREFIX):]
2622     dev_path = dev_path.replace("/", "_")
2623     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2624     return fpath
2625
2626   @classmethod
2627   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2628     """Updates the cache information for a given device.
2629
2630     @type dev_path: str
2631     @param dev_path: the pathname of the device
2632     @type owner: str
2633     @param owner: the owner (instance name) of the device
2634     @type on_primary: bool
2635     @param on_primary: whether this is the primary
2636         node nor not
2637     @type iv_name: str
2638     @param iv_name: the instance-visible name of the
2639         device, as in objects.Disk.iv_name
2640
2641     @rtype: None
2642
2643     """
2644     if dev_path is None:
2645       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2646       return
2647     fpath = cls._ConvertPath(dev_path)
2648     if on_primary:
2649       state = "primary"
2650     else:
2651       state = "secondary"
2652     if iv_name is None:
2653       iv_name = "not_visible"
2654     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2655     try:
2656       utils.WriteFile(fpath, data=fdata)
2657     except EnvironmentError, err:
2658       logging.exception("Can't update bdev cache for %s", dev_path)
2659
2660   @classmethod
2661   def RemoveCache(cls, dev_path):
2662     """Remove data for a dev_path.
2663
2664     This is just a wrapper over L{utils.RemoveFile} with a converted
2665     path name and logging.
2666
2667     @type dev_path: str
2668     @param dev_path: the pathname of the device
2669
2670     @rtype: None
2671
2672     """
2673     if dev_path is None:
2674       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2675       return
2676     fpath = cls._ConvertPath(dev_path)
2677     try:
2678       utils.RemoveFile(fpath)
2679     except EnvironmentError, err:
2680       logging.exception("Can't update bdev cache for %s", dev_path)