831f36cf16dcbfa19f27f554f62c679e155b448b
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError, err:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = str(err)
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       raise
579
580   return results
581
582
583 def GetInstanceInfo(instance, hname):
584   """Gives back the informations about an instance as a dictionary.
585
586   @type instance: string
587   @param instance: the instance name
588   @type hname: string
589   @param hname: the hypervisor type of the instance
590
591   @rtype: dict
592   @return: dictionary with the following keys:
593       - memory: memory size of instance (int)
594       - state: xen state of instance (string)
595       - time: cpu time of instance (float)
596
597   """
598   output = {}
599
600   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
601   if iinfo is not None:
602     output['memory'] = iinfo[2]
603     output['state'] = iinfo[4]
604     output['time'] = iinfo[5]
605
606   return output
607
608
609 def GetInstanceMigratable(instance):
610   """Gives whether an instance can be migrated.
611
612   @type instance: L{objects.Instance}
613   @param instance: object representing the instance to be checked.
614
615   @rtype: tuple
616   @return: tuple of (result, description) where:
617       - result: whether the instance can be migrated or not
618       - description: a description of the issue, if relevant
619
620   """
621   hyper = hypervisor.GetHypervisor(instance.hypervisor)
622   if instance.name not in hyper.ListInstances():
623     return (False, 'not running')
624
625   for idx in range(len(instance.disks)):
626     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
627     if not os.path.islink(link_name):
628       return (False, 'not restarted since ganeti 1.2.5')
629
630   return (True, '')
631
632
633 def GetAllInstancesInfo(hypervisor_list):
634   """Gather data about all instances.
635
636   This is the equivalent of L{GetInstanceInfo}, except that it
637   computes data for all instances at once, thus being faster if one
638   needs data about more than one instance.
639
640   @type hypervisor_list: list
641   @param hypervisor_list: list of hypervisors to query for instance data
642
643   @rtype: dict
644   @return: dictionary of instance: data, with data having the following keys:
645       - memory: memory size of instance (int)
646       - state: xen state of instance (string)
647       - time: cpu time of instance (float)
648       - vcpus: the number of vcpus
649
650   """
651   output = {}
652
653   for hname in hypervisor_list:
654     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
655     if iinfo:
656       for name, inst_id, memory, vcpus, state, times in iinfo:
657         value = {
658           'memory': memory,
659           'vcpus': vcpus,
660           'state': state,
661           'time': times,
662           }
663         if name in output and output[name] != value:
664           raise errors.HypervisorError("Instance %s running duplicate"
665                                        " with different parameters" % name)
666         output[name] = value
667
668   return output
669
670
671 def InstanceOsAdd(instance):
672   """Add an OS to an instance.
673
674   @type instance: L{objects.Instance}
675   @param instance: Instance whose OS is to be installed
676   @rtype: boolean
677   @return: the success of the operation
678
679   """
680   try:
681     inst_os = OSFromDisk(instance.os)
682   except errors.InvalidOS, err:
683     os_name, os_dir, os_err = err.args
684     if os_dir is None:
685       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
686     else:
687       return (False, "Error parsing OS '%s' in directory %s: %s" %
688               (os_name, os_dir, os_err))
689
690   create_env = OSEnvironment(instance)
691
692   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
693                                      instance.name, int(time.time()))
694
695   result = utils.RunCmd([inst_os.create_script], env=create_env,
696                         cwd=inst_os.path, output=logfile,)
697   if result.failed:
698     logging.error("os create command '%s' returned error: %s, logfile: %s,"
699                   " output: %s", result.cmd, result.fail_reason, logfile,
700                   result.output)
701     lines = [utils.SafeEncode(val)
702              for val in utils.TailFile(logfile, lines=20)]
703     return (False, "OS create script failed (%s), last lines in the"
704             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
705
706   return (True, "Successfully installed")
707
708
709 def RunRenameInstance(instance, old_name):
710   """Run the OS rename script for an instance.
711
712   @type instance: L{objects.Instance}
713   @param instance: Instance whose OS is to be installed
714   @type old_name: string
715   @param old_name: previous instance name
716   @rtype: boolean
717   @return: the success of the operation
718
719   """
720   inst_os = OSFromDisk(instance.os)
721
722   rename_env = OSEnvironment(instance)
723   rename_env['OLD_INSTANCE_NAME'] = old_name
724
725   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
726                                            old_name,
727                                            instance.name, int(time.time()))
728
729   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
730                         cwd=inst_os.path, output=logfile)
731
732   if result.failed:
733     logging.error("os create command '%s' returned error: %s output: %s",
734                   result.cmd, result.fail_reason, result.output)
735     lines = [utils.SafeEncode(val)
736              for val in utils.TailFile(logfile, lines=20)]
737     return (False, "OS rename script failed (%s), last lines in the"
738             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
739
740   return (True, "Rename successful")
741
742
743 def _GetVGInfo(vg_name):
744   """Get informations about the volume group.
745
746   @type vg_name: str
747   @param vg_name: the volume group which we query
748   @rtype: dict
749   @return:
750     A dictionary with the following keys:
751       - C{vg_size} is the total size of the volume group in MiB
752       - C{vg_free} is the free size of the volume group in MiB
753       - C{pv_count} are the number of physical disks in that VG
754
755     If an error occurs during gathering of data, we return the same dict
756     with keys all set to None.
757
758   """
759   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
760
761   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
762                          "--nosuffix", "--units=m", "--separator=:", vg_name])
763
764   if retval.failed:
765     logging.error("volume group %s not present", vg_name)
766     return retdic
767   valarr = retval.stdout.strip().rstrip(':').split(':')
768   if len(valarr) == 3:
769     try:
770       retdic = {
771         "vg_size": int(round(float(valarr[0]), 0)),
772         "vg_free": int(round(float(valarr[1]), 0)),
773         "pv_count": int(valarr[2]),
774         }
775     except ValueError, err:
776       logging.exception("Fail to parse vgs output")
777   else:
778     logging.error("vgs output has the wrong number of fields (expected"
779                   " three): %s", str(valarr))
780   return retdic
781
782
783 def _GetBlockDevSymlinkPath(instance_name, idx):
784   return os.path.join(constants.DISK_LINKS_DIR,
785                       "%s:%d" % (instance_name, idx))
786
787
788 def _SymlinkBlockDev(instance_name, device_path, idx):
789   """Set up symlinks to a instance's block device.
790
791   This is an auxiliary function run when an instance is start (on the primary
792   node) or when an instance is migrated (on the target node).
793
794
795   @param instance_name: the name of the target instance
796   @param device_path: path of the physical block device, on the node
797   @param idx: the disk index
798   @return: absolute path to the disk's symlink
799
800   """
801   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
802   try:
803     os.symlink(device_path, link_name)
804   except OSError, err:
805     if err.errno == errno.EEXIST:
806       if (not os.path.islink(link_name) or
807           os.readlink(link_name) != device_path):
808         os.remove(link_name)
809         os.symlink(device_path, link_name)
810     else:
811       raise
812
813   return link_name
814
815
816 def _RemoveBlockDevLinks(instance_name, disks):
817   """Remove the block device symlinks belonging to the given instance.
818
819   """
820   for idx, disk in enumerate(disks):
821     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
822     if os.path.islink(link_name):
823       try:
824         os.remove(link_name)
825       except OSError:
826         logging.exception("Can't remove symlink '%s'", link_name)
827
828
829 def _GatherAndLinkBlockDevs(instance):
830   """Set up an instance's block device(s).
831
832   This is run on the primary node at instance startup. The block
833   devices must be already assembled.
834
835   @type instance: L{objects.Instance}
836   @param instance: the instance whose disks we shoul assemble
837   @rtype: list
838   @return: list of (disk_object, device_path)
839
840   """
841   block_devices = []
842   for idx, disk in enumerate(instance.disks):
843     device = _RecursiveFindBD(disk)
844     if device is None:
845       raise errors.BlockDeviceError("Block device '%s' is not set up." %
846                                     str(disk))
847     device.Open()
848     try:
849       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
850     except OSError, e:
851       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
852                                     e.strerror)
853
854     block_devices.append((disk, link_name))
855
856   return block_devices
857
858
859 def StartInstance(instance):
860   """Start an instance.
861
862   @type instance: L{objects.Instance}
863   @param instance: the instance object
864   @rtype: boolean
865   @return: whether the startup was successful or not
866
867   """
868   running_instances = GetInstanceList([instance.hypervisor])
869
870   if instance.name in running_instances:
871     return (True, "Already running")
872
873   try:
874     block_devices = _GatherAndLinkBlockDevs(instance)
875     hyper = hypervisor.GetHypervisor(instance.hypervisor)
876     hyper.StartInstance(instance, block_devices)
877   except errors.BlockDeviceError, err:
878     logging.exception("Failed to start instance")
879     return (False, "Block device error: %s" % str(err))
880   except errors.HypervisorError, err:
881     logging.exception("Failed to start instance")
882     _RemoveBlockDevLinks(instance.name, instance.disks)
883     return (False, "Hypervisor error: %s" % str(err))
884
885   return (True, "Instance started successfully")
886
887
888 def InstanceShutdown(instance):
889   """Shut an instance down.
890
891   @note: this functions uses polling with a hardcoded timeout.
892
893   @type instance: L{objects.Instance}
894   @param instance: the instance object
895   @rtype: boolean
896   @return: whether the startup was successful or not
897
898   """
899   hv_name = instance.hypervisor
900   running_instances = GetInstanceList([hv_name])
901
902   if instance.name not in running_instances:
903     return (True, "Instance already stopped")
904
905   hyper = hypervisor.GetHypervisor(hv_name)
906   try:
907     hyper.StopInstance(instance)
908   except errors.HypervisorError, err:
909     msg = "Failed to stop instance %s: %s" % (instance.name, err)
910     logging.error(msg)
911     return (False, msg)
912
913   # test every 10secs for 2min
914
915   time.sleep(1)
916   for dummy in range(11):
917     if instance.name not in GetInstanceList([hv_name]):
918       break
919     time.sleep(10)
920   else:
921     # the shutdown did not succeed
922     logging.error("Shutdown of '%s' unsuccessful, using destroy",
923                   instance.name)
924
925     try:
926       hyper.StopInstance(instance, force=True)
927     except errors.HypervisorError, err:
928       msg = "Failed to force stop instance %s: %s" % (instance.name, err)
929       logging.error(msg)
930       return (False, msg)
931
932     time.sleep(1)
933     if instance.name in GetInstanceList([hv_name]):
934       msg = ("Could not shutdown instance %s even by destroy" %
935              instance.name)
936       logging.error(msg)
937       return (False, msg)
938
939   _RemoveBlockDevLinks(instance.name, instance.disks)
940
941   return (True, "Instance has been shutdown successfully")
942
943
944 def InstanceReboot(instance, reboot_type):
945   """Reboot an instance.
946
947   @type instance: L{objects.Instance}
948   @param instance: the instance object to reboot
949   @type reboot_type: str
950   @param reboot_type: the type of reboot, one the following
951     constants:
952       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
953         instance OS, do not recreate the VM
954       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
955         restart the VM (at the hypervisor level)
956       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
957         is not accepted here, since that mode is handled
958         differently
959   @rtype: boolean
960   @return: the success of the operation
961
962   """
963   running_instances = GetInstanceList([instance.hypervisor])
964
965   if instance.name not in running_instances:
966     msg = "Cannot reboot instance %s that is not running" % instance.name
967     logging.error(msg)
968     return (False, msg)
969
970   hyper = hypervisor.GetHypervisor(instance.hypervisor)
971   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
972     try:
973       hyper.RebootInstance(instance)
974     except errors.HypervisorError, err:
975       msg = "Failed to soft reboot instance %s: %s" % (instance.name, err)
976       logging.error(msg)
977       return (False, msg)
978   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
979     try:
980       stop_result = InstanceShutdown(instance)
981       if not stop_result[0]:
982         return stop_result
983       return StartInstance(instance)
984     except errors.HypervisorError, err:
985       msg = "Failed to hard reboot instance %s: %s" % (instance.name, err)
986       logging.error(msg)
987       return (False, msg)
988   else:
989     return (False, "Invalid reboot_type received: %s" % (reboot_type,))
990
991   return (True, "Reboot successful")
992
993
994 def MigrationInfo(instance):
995   """Gather information about an instance to be migrated.
996
997   @type instance: L{objects.Instance}
998   @param instance: the instance definition
999
1000   """
1001   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1002   try:
1003     info = hyper.MigrationInfo(instance)
1004   except errors.HypervisorError, err:
1005     msg = "Failed to fetch migration information"
1006     logging.exception(msg)
1007     return (False, '%s: %s' % (msg, err))
1008   return (True, info)
1009
1010
1011 def AcceptInstance(instance, info, target):
1012   """Prepare the node to accept an instance.
1013
1014   @type instance: L{objects.Instance}
1015   @param instance: the instance definition
1016   @type info: string/data (opaque)
1017   @param info: migration information, from the source node
1018   @type target: string
1019   @param target: target host (usually ip), on this node
1020
1021   """
1022   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023   try:
1024     hyper.AcceptInstance(instance, info, target)
1025   except errors.HypervisorError, err:
1026     msg = "Failed to accept instance"
1027     logging.exception(msg)
1028     return (False, '%s: %s' % (msg, err))
1029   return (True, "Accept successfull")
1030
1031
1032 def FinalizeMigration(instance, info, success):
1033   """Finalize any preparation to accept an instance.
1034
1035   @type instance: L{objects.Instance}
1036   @param instance: the instance definition
1037   @type info: string/data (opaque)
1038   @param info: migration information, from the source node
1039   @type success: boolean
1040   @param success: whether the migration was a success or a failure
1041
1042   """
1043   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1044   try:
1045     hyper.FinalizeMigration(instance, info, success)
1046   except errors.HypervisorError, err:
1047     msg = "Failed to finalize migration"
1048     logging.exception(msg)
1049     return (False, '%s: %s' % (msg, err))
1050   return (True, "Migration Finalized")
1051
1052
1053 def MigrateInstance(instance, target, live):
1054   """Migrates an instance to another node.
1055
1056   @type instance: L{objects.Instance}
1057   @param instance: the instance definition
1058   @type target: string
1059   @param target: the target node name
1060   @type live: boolean
1061   @param live: whether the migration should be done live or not (the
1062       interpretation of this parameter is left to the hypervisor)
1063   @rtype: tuple
1064   @return: a tuple of (success, msg) where:
1065       - succes is a boolean denoting the success/failure of the operation
1066       - msg is a string with details in case of failure
1067
1068   """
1069   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1070
1071   try:
1072     hyper.MigrateInstance(instance.name, target, live)
1073   except errors.HypervisorError, err:
1074     msg = "Failed to migrate instance"
1075     logging.exception(msg)
1076     return (False, "%s: %s" % (msg, err))
1077   return (True, "Migration successfull")
1078
1079
1080 def BlockdevCreate(disk, size, owner, on_primary, info):
1081   """Creates a block device for an instance.
1082
1083   @type disk: L{objects.Disk}
1084   @param disk: the object describing the disk we should create
1085   @type size: int
1086   @param size: the size of the physical underlying device, in MiB
1087   @type owner: str
1088   @param owner: the name of the instance for which disk is created,
1089       used for device cache data
1090   @type on_primary: boolean
1091   @param on_primary:  indicates if it is the primary node or not
1092   @type info: string
1093   @param info: string that will be sent to the physical device
1094       creation, used for example to set (LVM) tags on LVs
1095
1096   @return: the new unique_id of the device (this can sometime be
1097       computed only after creation), or None. On secondary nodes,
1098       it's not required to return anything.
1099
1100   """
1101   clist = []
1102   if disk.children:
1103     for child in disk.children:
1104       try:
1105         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1106       except errors.BlockDeviceError, err:
1107         errmsg = "Can't assemble device %s: %s" % (child, err)
1108         logging.error(errmsg)
1109         return False, errmsg
1110       if on_primary or disk.AssembleOnSecondary():
1111         # we need the children open in case the device itself has to
1112         # be assembled
1113         try:
1114           crdev.Open()
1115         except errors.BlockDeviceError, err:
1116           errmsg = "Can't make child '%s' read-write: %s" % (child, err)
1117           logging.error(errmsg)
1118           return False, errmsg
1119       clist.append(crdev)
1120
1121   try:
1122     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1123   except errors.BlockDeviceError, err:
1124     return False, "Can't create block device: %s" % str(err)
1125
1126   if on_primary or disk.AssembleOnSecondary():
1127     try:
1128       device.Assemble()
1129     except errors.BlockDeviceError, err:
1130       errmsg = ("Can't assemble device after creation, very"
1131                 " unusual event: %s" % str(err))
1132       logging.error(errmsg)
1133       return False, errmsg
1134     device.SetSyncSpeed(constants.SYNC_SPEED)
1135     if on_primary or disk.OpenOnSecondary():
1136       try:
1137         device.Open(force=True)
1138       except errors.BlockDeviceError, err:
1139         errmsg = ("Can't make device r/w after creation, very"
1140                   " unusual event: %s" % str(err))
1141         logging.error(errmsg)
1142         return False, errmsg
1143     DevCacheManager.UpdateCache(device.dev_path, owner,
1144                                 on_primary, disk.iv_name)
1145
1146   device.SetInfo(info)
1147
1148   physical_id = device.unique_id
1149   return True, physical_id
1150
1151
1152 def BlockdevRemove(disk):
1153   """Remove a block device.
1154
1155   @note: This is intended to be called recursively.
1156
1157   @type disk: L{objects.Disk}
1158   @param disk: the disk object we should remove
1159   @rtype: boolean
1160   @return: the success of the operation
1161
1162   """
1163   msgs = []
1164   result = True
1165   try:
1166     rdev = _RecursiveFindBD(disk)
1167   except errors.BlockDeviceError, err:
1168     # probably can't attach
1169     logging.info("Can't attach to device %s in remove", disk)
1170     rdev = None
1171   if rdev is not None:
1172     r_path = rdev.dev_path
1173     try:
1174       rdev.Remove()
1175     except errors.BlockDeviceError, err:
1176       msgs.append(str(err))
1177       result = False
1178     if result:
1179       DevCacheManager.RemoveCache(r_path)
1180
1181   if disk.children:
1182     for child in disk.children:
1183       c_status, c_msg = BlockdevRemove(child)
1184       result = result and c_status
1185       if c_msg: # not an empty message
1186         msgs.append(c_msg)
1187
1188   return (result, "; ".join(msgs))
1189
1190
1191 def _RecursiveAssembleBD(disk, owner, as_primary):
1192   """Activate a block device for an instance.
1193
1194   This is run on the primary and secondary nodes for an instance.
1195
1196   @note: this function is called recursively.
1197
1198   @type disk: L{objects.Disk}
1199   @param disk: the disk we try to assemble
1200   @type owner: str
1201   @param owner: the name of the instance which owns the disk
1202   @type as_primary: boolean
1203   @param as_primary: if we should make the block device
1204       read/write
1205
1206   @return: the assembled device or None (in case no device
1207       was assembled)
1208   @raise errors.BlockDeviceError: in case there is an error
1209       during the activation of the children or the device
1210       itself
1211
1212   """
1213   children = []
1214   if disk.children:
1215     mcn = disk.ChildrenNeeded()
1216     if mcn == -1:
1217       mcn = 0 # max number of Nones allowed
1218     else:
1219       mcn = len(disk.children) - mcn # max number of Nones
1220     for chld_disk in disk.children:
1221       try:
1222         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1223       except errors.BlockDeviceError, err:
1224         if children.count(None) >= mcn:
1225           raise
1226         cdev = None
1227         logging.error("Error in child activation (but continuing): %s",
1228                       str(err))
1229       children.append(cdev)
1230
1231   if as_primary or disk.AssembleOnSecondary():
1232     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1233     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1234     result = r_dev
1235     if as_primary or disk.OpenOnSecondary():
1236       r_dev.Open()
1237     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1238                                 as_primary, disk.iv_name)
1239
1240   else:
1241     result = True
1242   return result
1243
1244
1245 def BlockdevAssemble(disk, owner, as_primary):
1246   """Activate a block device for an instance.
1247
1248   This is a wrapper over _RecursiveAssembleBD.
1249
1250   @rtype: str or boolean
1251   @return: a C{/dev/...} path for primary nodes, and
1252       C{True} for secondary nodes
1253
1254   """
1255   status = True
1256   result = "no error information"
1257   try:
1258     result = _RecursiveAssembleBD(disk, owner, as_primary)
1259     if isinstance(result, bdev.BlockDev):
1260       result = result.dev_path
1261   except errors.BlockDeviceError, err:
1262     result = "Error while assembling disk: %s" % str(err)
1263     status = False
1264   return (status, result)
1265
1266
1267 def BlockdevShutdown(disk):
1268   """Shut down a block device.
1269
1270   First, if the device is assembled (Attach() is successfull), then
1271   the device is shutdown. Then the children of the device are
1272   shutdown.
1273
1274   This function is called recursively. Note that we don't cache the
1275   children or such, as oppossed to assemble, shutdown of different
1276   devices doesn't require that the upper device was active.
1277
1278   @type disk: L{objects.Disk}
1279   @param disk: the description of the disk we should
1280       shutdown
1281   @rtype: boolean
1282   @return: the success of the operation
1283
1284   """
1285   msgs = []
1286   result = True
1287   r_dev = _RecursiveFindBD(disk)
1288   if r_dev is not None:
1289     r_path = r_dev.dev_path
1290     try:
1291       r_dev.Shutdown()
1292       DevCacheManager.RemoveCache(r_path)
1293     except errors.BlockDeviceError, err:
1294       msgs.append(str(err))
1295       result = False
1296
1297   if disk.children:
1298     for child in disk.children:
1299       c_status, c_msg = BlockdevShutdown(child)
1300       result = result and c_status
1301       if c_msg: # not an empty message
1302         msgs.append(c_msg)
1303
1304   return (result, "; ".join(msgs))
1305
1306
1307 def BlockdevAddchildren(parent_cdev, new_cdevs):
1308   """Extend a mirrored block device.
1309
1310   @type parent_cdev: L{objects.Disk}
1311   @param parent_cdev: the disk to which we should add children
1312   @type new_cdevs: list of L{objects.Disk}
1313   @param new_cdevs: the list of children which we should add
1314   @rtype: boolean
1315   @return: the success of the operation
1316
1317   """
1318   parent_bdev = _RecursiveFindBD(parent_cdev)
1319   if parent_bdev is None:
1320     logging.error("Can't find parent device")
1321     return False
1322   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1323   if new_bdevs.count(None) > 0:
1324     logging.error("Can't find new device(s) to add: %s:%s",
1325                   new_bdevs, new_cdevs)
1326     return False
1327   parent_bdev.AddChildren(new_bdevs)
1328   return True
1329
1330
1331 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1332   """Shrink a mirrored block device.
1333
1334   @type parent_cdev: L{objects.Disk}
1335   @param parent_cdev: the disk from which we should remove children
1336   @type new_cdevs: list of L{objects.Disk}
1337   @param new_cdevs: the list of children which we should remove
1338   @rtype: boolean
1339   @return: the success of the operation
1340
1341   """
1342   parent_bdev = _RecursiveFindBD(parent_cdev)
1343   if parent_bdev is None:
1344     logging.error("Can't find parent in remove children: %s", parent_cdev)
1345     return False
1346   devs = []
1347   for disk in new_cdevs:
1348     rpath = disk.StaticDevPath()
1349     if rpath is None:
1350       bd = _RecursiveFindBD(disk)
1351       if bd is None:
1352         logging.error("Can't find dynamic device %s while removing children",
1353                       disk)
1354         return False
1355       else:
1356         devs.append(bd.dev_path)
1357     else:
1358       devs.append(rpath)
1359   parent_bdev.RemoveChildren(devs)
1360   return True
1361
1362
1363 def BlockdevGetmirrorstatus(disks):
1364   """Get the mirroring status of a list of devices.
1365
1366   @type disks: list of L{objects.Disk}
1367   @param disks: the list of disks which we should query
1368   @rtype: disk
1369   @return:
1370       a list of (mirror_done, estimated_time) tuples, which
1371       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1372   @raise errors.BlockDeviceError: if any of the disks cannot be
1373       found
1374
1375   """
1376   stats = []
1377   for dsk in disks:
1378     rbd = _RecursiveFindBD(dsk)
1379     if rbd is None:
1380       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1381     stats.append(rbd.CombinedSyncStatus())
1382   return stats
1383
1384
1385 def _RecursiveFindBD(disk):
1386   """Check if a device is activated.
1387
1388   If so, return informations about the real device.
1389
1390   @type disk: L{objects.Disk}
1391   @param disk: the disk object we need to find
1392
1393   @return: None if the device can't be found,
1394       otherwise the device instance
1395
1396   """
1397   children = []
1398   if disk.children:
1399     for chdisk in disk.children:
1400       children.append(_RecursiveFindBD(chdisk))
1401
1402   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1403
1404
1405 def BlockdevFind(disk):
1406   """Check if a device is activated.
1407
1408   If it is, return informations about the real device.
1409
1410   @type disk: L{objects.Disk}
1411   @param disk: the disk to find
1412   @rtype: None or tuple
1413   @return: None if the disk cannot be found, otherwise a
1414       tuple (device_path, major, minor, sync_percent,
1415       estimated_time, is_degraded)
1416
1417   """
1418   try:
1419     rbd = _RecursiveFindBD(disk)
1420   except errors.BlockDeviceError, err:
1421     return (False, str(err))
1422   if rbd is None:
1423     return (True, None)
1424   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1425
1426
1427 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1428   """Write a file to the filesystem.
1429
1430   This allows the master to overwrite(!) a file. It will only perform
1431   the operation if the file belongs to a list of configuration files.
1432
1433   @type file_name: str
1434   @param file_name: the target file name
1435   @type data: str
1436   @param data: the new contents of the file
1437   @type mode: int
1438   @param mode: the mode to give the file (can be None)
1439   @type uid: int
1440   @param uid: the owner of the file (can be -1 for default)
1441   @type gid: int
1442   @param gid: the group of the file (can be -1 for default)
1443   @type atime: float
1444   @param atime: the atime to set on the file (can be None)
1445   @type mtime: float
1446   @param mtime: the mtime to set on the file (can be None)
1447   @rtype: boolean
1448   @return: the success of the operation; errors are logged
1449       in the node daemon log
1450
1451   """
1452   if not os.path.isabs(file_name):
1453     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1454                   file_name)
1455     return False
1456
1457   allowed_files = [
1458     constants.CLUSTER_CONF_FILE,
1459     constants.ETC_HOSTS,
1460     constants.SSH_KNOWN_HOSTS_FILE,
1461     constants.VNC_PASSWORD_FILE,
1462     ]
1463
1464   if file_name not in allowed_files:
1465     logging.error("Filename passed to UploadFile not in allowed"
1466                  " upload targets: '%s'", file_name)
1467     return False
1468
1469   raw_data = _Decompress(data)
1470
1471   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1472                   atime=atime, mtime=mtime)
1473   return True
1474
1475
1476 def WriteSsconfFiles(values):
1477   """Update all ssconf files.
1478
1479   Wrapper around the SimpleStore.WriteFiles.
1480
1481   """
1482   ssconf.SimpleStore().WriteFiles(values)
1483
1484
1485 def _ErrnoOrStr(err):
1486   """Format an EnvironmentError exception.
1487
1488   If the L{err} argument has an errno attribute, it will be looked up
1489   and converted into a textual C{E...} description. Otherwise the
1490   string representation of the error will be returned.
1491
1492   @type err: L{EnvironmentError}
1493   @param err: the exception to format
1494
1495   """
1496   if hasattr(err, 'errno'):
1497     detail = errno.errorcode[err.errno]
1498   else:
1499     detail = str(err)
1500   return detail
1501
1502
1503 def _OSOndiskVersion(name, os_dir):
1504   """Compute and return the API version of a given OS.
1505
1506   This function will try to read the API version of the OS given by
1507   the 'name' parameter and residing in the 'os_dir' directory.
1508
1509   @type name: str
1510   @param name: the OS name we should look for
1511   @type os_dir: str
1512   @param os_dir: the directory inwhich we should look for the OS
1513   @rtype: int or None
1514   @return:
1515       Either an integer denoting the version or None in the
1516       case when this is not a valid OS name.
1517   @raise errors.InvalidOS: if the OS cannot be found
1518
1519   """
1520   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1521
1522   try:
1523     st = os.stat(api_file)
1524   except EnvironmentError, err:
1525     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1526                            " found (%s)" % _ErrnoOrStr(err))
1527
1528   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1529     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1530                            " a regular file")
1531
1532   try:
1533     f = open(api_file)
1534     try:
1535       api_versions = f.readlines()
1536     finally:
1537       f.close()
1538   except EnvironmentError, err:
1539     raise errors.InvalidOS(name, os_dir, "error while reading the"
1540                            " API version (%s)" % _ErrnoOrStr(err))
1541
1542   api_versions = [version.strip() for version in api_versions]
1543   try:
1544     api_versions = [int(version) for version in api_versions]
1545   except (TypeError, ValueError), err:
1546     raise errors.InvalidOS(name, os_dir,
1547                            "API version is not integer (%s)" % str(err))
1548
1549   return api_versions
1550
1551
1552 def DiagnoseOS(top_dirs=None):
1553   """Compute the validity for all OSes.
1554
1555   @type top_dirs: list
1556   @param top_dirs: the list of directories in which to
1557       search (if not given defaults to
1558       L{constants.OS_SEARCH_PATH})
1559   @rtype: list of L{objects.OS}
1560   @return: an OS object for each name in all the given
1561       directories
1562
1563   """
1564   if top_dirs is None:
1565     top_dirs = constants.OS_SEARCH_PATH
1566
1567   result = []
1568   for dir_name in top_dirs:
1569     if os.path.isdir(dir_name):
1570       try:
1571         f_names = utils.ListVisibleFiles(dir_name)
1572       except EnvironmentError, err:
1573         logging.exception("Can't list the OS directory %s", dir_name)
1574         break
1575       for name in f_names:
1576         try:
1577           os_inst = OSFromDisk(name, base_dir=dir_name)
1578           result.append(os_inst)
1579         except errors.InvalidOS, err:
1580           result.append(objects.OS.FromInvalidOS(err))
1581
1582   return result
1583
1584
1585 def OSFromDisk(name, base_dir=None):
1586   """Create an OS instance from disk.
1587
1588   This function will return an OS instance if the given name is a
1589   valid OS name. Otherwise, it will raise an appropriate
1590   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1591
1592   @type base_dir: string
1593   @keyword base_dir: Base directory containing OS installations.
1594                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1595   @rtype: L{objects.OS}
1596   @return: the OS instance if we find a valid one
1597   @raise errors.InvalidOS: if we don't find a valid OS
1598
1599   """
1600   if base_dir is None:
1601     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1602     if os_dir is None:
1603       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1604   else:
1605     os_dir = os.path.sep.join([base_dir, name])
1606
1607   api_versions = _OSOndiskVersion(name, os_dir)
1608
1609   if constants.OS_API_VERSION not in api_versions:
1610     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1611                            " (found %s want %s)"
1612                            % (api_versions, constants.OS_API_VERSION))
1613
1614   # OS Scripts dictionary, we will populate it with the actual script names
1615   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1616
1617   for script in os_scripts:
1618     os_scripts[script] = os.path.sep.join([os_dir, script])
1619
1620     try:
1621       st = os.stat(os_scripts[script])
1622     except EnvironmentError, err:
1623       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1624                              (script, _ErrnoOrStr(err)))
1625
1626     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1627       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1628                              script)
1629
1630     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1631       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1632                              script)
1633
1634
1635   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1636                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1637                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1638                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1639                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1640                     api_versions=api_versions)
1641
1642 def OSEnvironment(instance, debug=0):
1643   """Calculate the environment for an os script.
1644
1645   @type instance: L{objects.Instance}
1646   @param instance: target instance for the os script run
1647   @type debug: integer
1648   @param debug: debug level (0 or 1, for OS Api 10)
1649   @rtype: dict
1650   @return: dict of environment variables
1651   @raise errors.BlockDeviceError: if the block device
1652       cannot be found
1653
1654   """
1655   result = {}
1656   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1657   result['INSTANCE_NAME'] = instance.name
1658   result['INSTANCE_OS'] = instance.os
1659   result['HYPERVISOR'] = instance.hypervisor
1660   result['DISK_COUNT'] = '%d' % len(instance.disks)
1661   result['NIC_COUNT'] = '%d' % len(instance.nics)
1662   result['DEBUG_LEVEL'] = '%d' % debug
1663   for idx, disk in enumerate(instance.disks):
1664     real_disk = _RecursiveFindBD(disk)
1665     if real_disk is None:
1666       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1667                                     str(disk))
1668     real_disk.Open()
1669     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1670     result['DISK_%d_ACCESS' % idx] = disk.mode
1671     if constants.HV_DISK_TYPE in instance.hvparams:
1672       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1673         instance.hvparams[constants.HV_DISK_TYPE]
1674     if disk.dev_type in constants.LDS_BLOCK:
1675       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1676     elif disk.dev_type == constants.LD_FILE:
1677       result['DISK_%d_BACKEND_TYPE' % idx] = \
1678         'file:%s' % disk.physical_id[0]
1679   for idx, nic in enumerate(instance.nics):
1680     result['NIC_%d_MAC' % idx] = nic.mac
1681     if nic.ip:
1682       result['NIC_%d_IP' % idx] = nic.ip
1683     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1684     if constants.HV_NIC_TYPE in instance.hvparams:
1685       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1686         instance.hvparams[constants.HV_NIC_TYPE]
1687
1688   return result
1689
1690 def BlockdevGrow(disk, amount):
1691   """Grow a stack of block devices.
1692
1693   This function is called recursively, with the childrens being the
1694   first ones to resize.
1695
1696   @type disk: L{objects.Disk}
1697   @param disk: the disk to be grown
1698   @rtype: (status, result)
1699   @return: a tuple with the status of the operation
1700       (True/False), and the errors message if status
1701       is False
1702
1703   """
1704   r_dev = _RecursiveFindBD(disk)
1705   if r_dev is None:
1706     return False, "Cannot find block device %s" % (disk,)
1707
1708   try:
1709     r_dev.Grow(amount)
1710   except errors.BlockDeviceError, err:
1711     return False, str(err)
1712
1713   return True, None
1714
1715
1716 def BlockdevSnapshot(disk):
1717   """Create a snapshot copy of a block device.
1718
1719   This function is called recursively, and the snapshot is actually created
1720   just for the leaf lvm backend device.
1721
1722   @type disk: L{objects.Disk}
1723   @param disk: the disk to be snapshotted
1724   @rtype: string
1725   @return: snapshot disk path
1726
1727   """
1728   if disk.children:
1729     if len(disk.children) == 1:
1730       # only one child, let's recurse on it
1731       return BlockdevSnapshot(disk.children[0])
1732     else:
1733       # more than one child, choose one that matches
1734       for child in disk.children:
1735         if child.size == disk.size:
1736           # return implies breaking the loop
1737           return BlockdevSnapshot(child)
1738   elif disk.dev_type == constants.LD_LV:
1739     r_dev = _RecursiveFindBD(disk)
1740     if r_dev is not None:
1741       # let's stay on the safe side and ask for the full size, for now
1742       return r_dev.Snapshot(disk.size)
1743     else:
1744       return None
1745   else:
1746     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1747                                  " '%s' of type '%s'" %
1748                                  (disk.unique_id, disk.dev_type))
1749
1750
1751 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1752   """Export a block device snapshot to a remote node.
1753
1754   @type disk: L{objects.Disk}
1755   @param disk: the description of the disk to export
1756   @type dest_node: str
1757   @param dest_node: the destination node to export to
1758   @type instance: L{objects.Instance}
1759   @param instance: the instance object to whom the disk belongs
1760   @type cluster_name: str
1761   @param cluster_name: the cluster name, needed for SSH hostalias
1762   @type idx: int
1763   @param idx: the index of the disk in the instance's disk list,
1764       used to export to the OS scripts environment
1765   @rtype: boolean
1766   @return: the success of the operation
1767
1768   """
1769   export_env = OSEnvironment(instance)
1770
1771   inst_os = OSFromDisk(instance.os)
1772   export_script = inst_os.export_script
1773
1774   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1775                                      instance.name, int(time.time()))
1776   if not os.path.exists(constants.LOG_OS_DIR):
1777     os.mkdir(constants.LOG_OS_DIR, 0750)
1778   real_disk = _RecursiveFindBD(disk)
1779   if real_disk is None:
1780     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1781                                   str(disk))
1782   real_disk.Open()
1783
1784   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1785   export_env['EXPORT_INDEX'] = str(idx)
1786
1787   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1788   destfile = disk.physical_id[1]
1789
1790   # the target command is built out of three individual commands,
1791   # which are joined by pipes; we check each individual command for
1792   # valid parameters
1793   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1794                                export_script, logfile)
1795
1796   comprcmd = "gzip"
1797
1798   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1799                                 destdir, destdir, destfile)
1800   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1801                                                    constants.GANETI_RUNAS,
1802                                                    destcmd)
1803
1804   # all commands have been checked, so we're safe to combine them
1805   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1806
1807   result = utils.RunCmd(command, env=export_env)
1808
1809   if result.failed:
1810     logging.error("os snapshot export command '%s' returned error: %s"
1811                   " output: %s", command, result.fail_reason, result.output)
1812     return False
1813
1814   return True
1815
1816
1817 def FinalizeExport(instance, snap_disks):
1818   """Write out the export configuration information.
1819
1820   @type instance: L{objects.Instance}
1821   @param instance: the instance which we export, used for
1822       saving configuration
1823   @type snap_disks: list of L{objects.Disk}
1824   @param snap_disks: list of snapshot block devices, which
1825       will be used to get the actual name of the dump file
1826
1827   @rtype: boolean
1828   @return: the success of the operation
1829
1830   """
1831   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1832   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1833
1834   config = objects.SerializableConfigParser()
1835
1836   config.add_section(constants.INISECT_EXP)
1837   config.set(constants.INISECT_EXP, 'version', '0')
1838   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1839   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1840   config.set(constants.INISECT_EXP, 'os', instance.os)
1841   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1842
1843   config.add_section(constants.INISECT_INS)
1844   config.set(constants.INISECT_INS, 'name', instance.name)
1845   config.set(constants.INISECT_INS, 'memory', '%d' %
1846              instance.beparams[constants.BE_MEMORY])
1847   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1848              instance.beparams[constants.BE_VCPUS])
1849   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1850
1851   nic_total = 0
1852   for nic_count, nic in enumerate(instance.nics):
1853     nic_total += 1
1854     config.set(constants.INISECT_INS, 'nic%d_mac' %
1855                nic_count, '%s' % nic.mac)
1856     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1857     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1858                '%s' % nic.bridge)
1859   # TODO: redundant: on load can read nics until it doesn't exist
1860   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1861
1862   disk_total = 0
1863   for disk_count, disk in enumerate(snap_disks):
1864     if disk:
1865       disk_total += 1
1866       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1867                  ('%s' % disk.iv_name))
1868       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1869                  ('%s' % disk.physical_id[1]))
1870       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1871                  ('%d' % disk.size))
1872
1873   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1874
1875   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1876                   data=config.Dumps())
1877   shutil.rmtree(finaldestdir, True)
1878   shutil.move(destdir, finaldestdir)
1879
1880   return True
1881
1882
1883 def ExportInfo(dest):
1884   """Get export configuration information.
1885
1886   @type dest: str
1887   @param dest: directory containing the export
1888
1889   @rtype: L{objects.SerializableConfigParser}
1890   @return: a serializable config file containing the
1891       export info
1892
1893   """
1894   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1895
1896   config = objects.SerializableConfigParser()
1897   config.read(cff)
1898
1899   if (not config.has_section(constants.INISECT_EXP) or
1900       not config.has_section(constants.INISECT_INS)):
1901     return None
1902
1903   return config
1904
1905
1906 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1907   """Import an os image into an instance.
1908
1909   @type instance: L{objects.Instance}
1910   @param instance: instance to import the disks into
1911   @type src_node: string
1912   @param src_node: source node for the disk images
1913   @type src_images: list of string
1914   @param src_images: absolute paths of the disk images
1915   @rtype: list of boolean
1916   @return: each boolean represent the success of importing the n-th disk
1917
1918   """
1919   import_env = OSEnvironment(instance)
1920   inst_os = OSFromDisk(instance.os)
1921   import_script = inst_os.import_script
1922
1923   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1924                                         instance.name, int(time.time()))
1925   if not os.path.exists(constants.LOG_OS_DIR):
1926     os.mkdir(constants.LOG_OS_DIR, 0750)
1927
1928   comprcmd = "gunzip"
1929   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1930                                import_script, logfile)
1931
1932   final_result = []
1933   for idx, image in enumerate(src_images):
1934     if image:
1935       destcmd = utils.BuildShellCmd('cat %s', image)
1936       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1937                                                        constants.GANETI_RUNAS,
1938                                                        destcmd)
1939       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1940       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1941       import_env['IMPORT_INDEX'] = str(idx)
1942       result = utils.RunCmd(command, env=import_env)
1943       if result.failed:
1944         logging.error("Disk import command '%s' returned error: %s"
1945                       " output: %s", command, result.fail_reason,
1946                       result.output)
1947         final_result.append(False)
1948       else:
1949         final_result.append(True)
1950     else:
1951       final_result.append(True)
1952
1953   return final_result
1954
1955
1956 def ListExports():
1957   """Return a list of exports currently available on this machine.
1958
1959   @rtype: list
1960   @return: list of the exports
1961
1962   """
1963   if os.path.isdir(constants.EXPORT_DIR):
1964     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1965   else:
1966     return []
1967
1968
1969 def RemoveExport(export):
1970   """Remove an existing export from the node.
1971
1972   @type export: str
1973   @param export: the name of the export to remove
1974   @rtype: boolean
1975   @return: the success of the operation
1976
1977   """
1978   target = os.path.join(constants.EXPORT_DIR, export)
1979
1980   shutil.rmtree(target)
1981   # TODO: catch some of the relevant exceptions and provide a pretty
1982   # error message if rmtree fails.
1983
1984   return True
1985
1986
1987 def BlockdevRename(devlist):
1988   """Rename a list of block devices.
1989
1990   @type devlist: list of tuples
1991   @param devlist: list of tuples of the form  (disk,
1992       new_logical_id, new_physical_id); disk is an
1993       L{objects.Disk} object describing the current disk,
1994       and new logical_id/physical_id is the name we
1995       rename it to
1996   @rtype: boolean
1997   @return: True if all renames succeeded, False otherwise
1998
1999   """
2000   result = True
2001   for disk, unique_id in devlist:
2002     dev = _RecursiveFindBD(disk)
2003     if dev is None:
2004       result = False
2005       continue
2006     try:
2007       old_rpath = dev.dev_path
2008       dev.Rename(unique_id)
2009       new_rpath = dev.dev_path
2010       if old_rpath != new_rpath:
2011         DevCacheManager.RemoveCache(old_rpath)
2012         # FIXME: we should add the new cache information here, like:
2013         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2014         # but we don't have the owner here - maybe parse from existing
2015         # cache? for now, we only lose lvm data when we rename, which
2016         # is less critical than DRBD or MD
2017     except errors.BlockDeviceError, err:
2018       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2019       result = False
2020   return result
2021
2022
2023 def _TransformFileStorageDir(file_storage_dir):
2024   """Checks whether given file_storage_dir is valid.
2025
2026   Checks wheter the given file_storage_dir is within the cluster-wide
2027   default file_storage_dir stored in SimpleStore. Only paths under that
2028   directory are allowed.
2029
2030   @type file_storage_dir: str
2031   @param file_storage_dir: the path to check
2032
2033   @return: the normalized path if valid, None otherwise
2034
2035   """
2036   cfg = _GetConfig()
2037   file_storage_dir = os.path.normpath(file_storage_dir)
2038   base_file_storage_dir = cfg.GetFileStorageDir()
2039   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2040       base_file_storage_dir):
2041     logging.error("file storage directory '%s' is not under base file"
2042                   " storage directory '%s'",
2043                   file_storage_dir, base_file_storage_dir)
2044     return None
2045   return file_storage_dir
2046
2047
2048 def CreateFileStorageDir(file_storage_dir):
2049   """Create file storage directory.
2050
2051   @type file_storage_dir: str
2052   @param file_storage_dir: directory to create
2053
2054   @rtype: tuple
2055   @return: tuple with first element a boolean indicating wheter dir
2056       creation was successful or not
2057
2058   """
2059   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2060   result = True,
2061   if not file_storage_dir:
2062     result = False,
2063   else:
2064     if os.path.exists(file_storage_dir):
2065       if not os.path.isdir(file_storage_dir):
2066         logging.error("'%s' is not a directory", file_storage_dir)
2067         result = False,
2068     else:
2069       try:
2070         os.makedirs(file_storage_dir, 0750)
2071       except OSError, err:
2072         logging.error("Cannot create file storage directory '%s': %s",
2073                       file_storage_dir, err)
2074         result = False,
2075   return result
2076
2077
2078 def RemoveFileStorageDir(file_storage_dir):
2079   """Remove file storage directory.
2080
2081   Remove it only if it's empty. If not log an error and return.
2082
2083   @type file_storage_dir: str
2084   @param file_storage_dir: the directory we should cleanup
2085   @rtype: tuple (success,)
2086   @return: tuple of one element, C{success}, denoting
2087       whether the operation was successfull
2088
2089   """
2090   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2091   result = True,
2092   if not file_storage_dir:
2093     result = False,
2094   else:
2095     if os.path.exists(file_storage_dir):
2096       if not os.path.isdir(file_storage_dir):
2097         logging.error("'%s' is not a directory", file_storage_dir)
2098         result = False,
2099       # deletes dir only if empty, otherwise we want to return False
2100       try:
2101         os.rmdir(file_storage_dir)
2102       except OSError, err:
2103         logging.exception("Cannot remove file storage directory '%s'",
2104                           file_storage_dir)
2105         result = False,
2106   return result
2107
2108
2109 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2110   """Rename the file storage directory.
2111
2112   @type old_file_storage_dir: str
2113   @param old_file_storage_dir: the current path
2114   @type new_file_storage_dir: str
2115   @param new_file_storage_dir: the name we should rename to
2116   @rtype: tuple (success,)
2117   @return: tuple of one element, C{success}, denoting
2118       whether the operation was successful
2119
2120   """
2121   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2122   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2123   result = True,
2124   if not old_file_storage_dir or not new_file_storage_dir:
2125     result = False,
2126   else:
2127     if not os.path.exists(new_file_storage_dir):
2128       if os.path.isdir(old_file_storage_dir):
2129         try:
2130           os.rename(old_file_storage_dir, new_file_storage_dir)
2131         except OSError, err:
2132           logging.exception("Cannot rename '%s' to '%s'",
2133                             old_file_storage_dir, new_file_storage_dir)
2134           result =  False,
2135       else:
2136         logging.error("'%s' is not a directory", old_file_storage_dir)
2137         result = False,
2138     else:
2139       if os.path.exists(old_file_storage_dir):
2140         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2141                       old_file_storage_dir, new_file_storage_dir)
2142         result = False,
2143   return result
2144
2145
2146 def _IsJobQueueFile(file_name):
2147   """Checks whether the given filename is in the queue directory.
2148
2149   @type file_name: str
2150   @param file_name: the file name we should check
2151   @rtype: boolean
2152   @return: whether the file is under the queue directory
2153
2154   """
2155   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2156   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2157
2158   if not result:
2159     logging.error("'%s' is not a file in the queue directory",
2160                   file_name)
2161
2162   return result
2163
2164
2165 def JobQueueUpdate(file_name, content):
2166   """Updates a file in the queue directory.
2167
2168   This is just a wrapper over L{utils.WriteFile}, with proper
2169   checking.
2170
2171   @type file_name: str
2172   @param file_name: the job file name
2173   @type content: str
2174   @param content: the new job contents
2175   @rtype: boolean
2176   @return: the success of the operation
2177
2178   """
2179   if not _IsJobQueueFile(file_name):
2180     return False
2181
2182   # Write and replace the file atomically
2183   utils.WriteFile(file_name, data=_Decompress(content))
2184
2185   return True
2186
2187
2188 def JobQueueRename(old, new):
2189   """Renames a job queue file.
2190
2191   This is just a wrapper over os.rename with proper checking.
2192
2193   @type old: str
2194   @param old: the old (actual) file name
2195   @type new: str
2196   @param new: the desired file name
2197   @rtype: boolean
2198   @return: the success of the operation
2199
2200   """
2201   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2202     return False
2203
2204   utils.RenameFile(old, new, mkdir=True)
2205
2206   return True
2207
2208
2209 def JobQueueSetDrainFlag(drain_flag):
2210   """Set the drain flag for the queue.
2211
2212   This will set or unset the queue drain flag.
2213
2214   @type drain_flag: boolean
2215   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2216   @rtype: boolean
2217   @return: always True
2218   @warning: the function always returns True
2219
2220   """
2221   if drain_flag:
2222     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2223   else:
2224     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2225
2226   return True
2227
2228
2229 def BlockdevClose(instance_name, disks):
2230   """Closes the given block devices.
2231
2232   This means they will be switched to secondary mode (in case of
2233   DRBD).
2234
2235   @param instance_name: if the argument is not empty, the symlinks
2236       of this instance will be removed
2237   @type disks: list of L{objects.Disk}
2238   @param disks: the list of disks to be closed
2239   @rtype: tuple (success, message)
2240   @return: a tuple of success and message, where success
2241       indicates the succes of the operation, and message
2242       which will contain the error details in case we
2243       failed
2244
2245   """
2246   bdevs = []
2247   for cf in disks:
2248     rd = _RecursiveFindBD(cf)
2249     if rd is None:
2250       return (False, "Can't find device %s" % cf)
2251     bdevs.append(rd)
2252
2253   msg = []
2254   for rd in bdevs:
2255     try:
2256       rd.Close()
2257     except errors.BlockDeviceError, err:
2258       msg.append(str(err))
2259   if msg:
2260     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2261   else:
2262     if instance_name:
2263       _RemoveBlockDevLinks(instance_name, disks)
2264     return (True, "All devices secondary")
2265
2266
2267 def ValidateHVParams(hvname, hvparams):
2268   """Validates the given hypervisor parameters.
2269
2270   @type hvname: string
2271   @param hvname: the hypervisor name
2272   @type hvparams: dict
2273   @param hvparams: the hypervisor parameters to be validated
2274   @rtype: tuple (success, message)
2275   @return: a tuple of success and message, where success
2276       indicates the succes of the operation, and message
2277       which will contain the error details in case we
2278       failed
2279
2280   """
2281   try:
2282     hv_type = hypervisor.GetHypervisor(hvname)
2283     hv_type.ValidateParameters(hvparams)
2284     return (True, "Validation passed")
2285   except errors.HypervisorError, err:
2286     return (False, str(err))
2287
2288
2289 def DemoteFromMC():
2290   """Demotes the current node from master candidate role.
2291
2292   """
2293   # try to ensure we're not the master by mistake
2294   master, myself = ssconf.GetMasterAndMyself()
2295   if master == myself:
2296     return (False, "ssconf status shows I'm the master node, will not demote")
2297   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2298   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2299     return (False, "The master daemon is running, will not demote")
2300   try:
2301     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2302   except EnvironmentError, err:
2303     if err.errno != errno.ENOENT:
2304       return (False, "Error while backing up cluster file: %s" % str(err))
2305   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2306   return (True, "Done")
2307
2308
2309 def _FindDisks(nodes_ip, disks):
2310   """Sets the physical ID on disks and returns the block devices.
2311
2312   """
2313   # set the correct physical ID
2314   my_name = utils.HostInfo().name
2315   for cf in disks:
2316     cf.SetPhysicalID(my_name, nodes_ip)
2317
2318   bdevs = []
2319
2320   for cf in disks:
2321     rd = _RecursiveFindBD(cf)
2322     if rd is None:
2323       return (False, "Can't find device %s" % cf)
2324     bdevs.append(rd)
2325   return (True, bdevs)
2326
2327
2328 def DrbdDisconnectNet(nodes_ip, disks):
2329   """Disconnects the network on a list of drbd devices.
2330
2331   """
2332   status, bdevs = _FindDisks(nodes_ip, disks)
2333   if not status:
2334     return status, bdevs
2335
2336   # disconnect disks
2337   for rd in bdevs:
2338     try:
2339       rd.DisconnectNet()
2340     except errors.BlockDeviceError, err:
2341       logging.exception("Failed to go into standalone mode")
2342       return (False, "Can't change network configuration: %s" % str(err))
2343   return (True, "All disks are now disconnected")
2344
2345
2346 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2347   """Attaches the network on a list of drbd devices.
2348
2349   """
2350   status, bdevs = _FindDisks(nodes_ip, disks)
2351   if not status:
2352     return status, bdevs
2353
2354   if multimaster:
2355     for idx, rd in enumerate(bdevs):
2356       try:
2357         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2358       except EnvironmentError, err:
2359         return (False, "Can't create symlink: %s" % str(err))
2360   # reconnect disks, switch to new master configuration and if
2361   # needed primary mode
2362   for rd in bdevs:
2363     try:
2364       rd.AttachNet(multimaster)
2365     except errors.BlockDeviceError, err:
2366       return (False, "Can't change network configuration: %s" % str(err))
2367   # wait until the disks are connected; we need to retry the re-attach
2368   # if the device becomes standalone, as this might happen if the one
2369   # node disconnects and reconnects in a different mode before the
2370   # other node reconnects; in this case, one or both of the nodes will
2371   # decide it has wrong configuration and switch to standalone
2372   RECONNECT_TIMEOUT = 2 * 60
2373   sleep_time = 0.100 # start with 100 miliseconds
2374   timeout_limit = time.time() + RECONNECT_TIMEOUT
2375   while time.time() < timeout_limit:
2376     all_connected = True
2377     for rd in bdevs:
2378       stats = rd.GetProcStatus()
2379       if not (stats.is_connected or stats.is_in_resync):
2380         all_connected = False
2381       if stats.is_standalone:
2382         # peer had different config info and this node became
2383         # standalone, even though this should not happen with the
2384         # new staged way of changing disk configs
2385         try:
2386           rd.ReAttachNet(multimaster)
2387         except errors.BlockDeviceError, err:
2388           return (False, "Can't change network configuration: %s" % str(err))
2389     if all_connected:
2390       break
2391     time.sleep(sleep_time)
2392     sleep_time = min(5, sleep_time * 1.5)
2393   if not all_connected:
2394     return (False, "Timeout in disk reconnecting")
2395   if multimaster:
2396     # change to primary mode
2397     for rd in bdevs:
2398       try:
2399         rd.Open()
2400       except errors.BlockDeviceError, err:
2401         return (False, "Can't change to primary mode: %s" % str(err))
2402   if multimaster:
2403     msg = "multi-master and primary"
2404   else:
2405     msg = "single-master"
2406   return (True, "Disks are now configured as %s" % msg)
2407
2408
2409 def DrbdWaitSync(nodes_ip, disks):
2410   """Wait until DRBDs have synchronized.
2411
2412   """
2413   status, bdevs = _FindDisks(nodes_ip, disks)
2414   if not status:
2415     return status, bdevs
2416
2417   min_resync = 100
2418   alldone = True
2419   failure = False
2420   for rd in bdevs:
2421     stats = rd.GetProcStatus()
2422     if not (stats.is_connected or stats.is_in_resync):
2423       failure = True
2424       break
2425     alldone = alldone and (not stats.is_in_resync)
2426     if stats.sync_percent is not None:
2427       min_resync = min(min_resync, stats.sync_percent)
2428   return (not failure, (alldone, min_resync))
2429
2430
2431 class HooksRunner(object):
2432   """Hook runner.
2433
2434   This class is instantiated on the node side (ganeti-noded) and not
2435   on the master side.
2436
2437   """
2438   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2439
2440   def __init__(self, hooks_base_dir=None):
2441     """Constructor for hooks runner.
2442
2443     @type hooks_base_dir: str or None
2444     @param hooks_base_dir: if not None, this overrides the
2445         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2446
2447     """
2448     if hooks_base_dir is None:
2449       hooks_base_dir = constants.HOOKS_BASE_DIR
2450     self._BASE_DIR = hooks_base_dir
2451
2452   @staticmethod
2453   def ExecHook(script, env):
2454     """Exec one hook script.
2455
2456     @type script: str
2457     @param script: the full path to the script
2458     @type env: dict
2459     @param env: the environment with which to exec the script
2460     @rtype: tuple (success, message)
2461     @return: a tuple of success and message, where success
2462         indicates the succes of the operation, and message
2463         which will contain the error details in case we
2464         failed
2465
2466     """
2467     # exec the process using subprocess and log the output
2468     fdstdin = None
2469     try:
2470       fdstdin = open("/dev/null", "r")
2471       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2472                                stderr=subprocess.STDOUT, close_fds=True,
2473                                shell=False, cwd="/", env=env)
2474       output = ""
2475       try:
2476         output = child.stdout.read(4096)
2477         child.stdout.close()
2478       except EnvironmentError, err:
2479         output += "Hook script error: %s" % str(err)
2480
2481       while True:
2482         try:
2483           result = child.wait()
2484           break
2485         except EnvironmentError, err:
2486           if err.errno == errno.EINTR:
2487             continue
2488           raise
2489     finally:
2490       # try not to leak fds
2491       for fd in (fdstdin, ):
2492         if fd is not None:
2493           try:
2494             fd.close()
2495           except EnvironmentError, err:
2496             # just log the error
2497             #logging.exception("Error while closing fd %s", fd)
2498             pass
2499
2500     return result == 0, utils.SafeEncode(output.strip())
2501
2502   def RunHooks(self, hpath, phase, env):
2503     """Run the scripts in the hooks directory.
2504
2505     @type hpath: str
2506     @param hpath: the path to the hooks directory which
2507         holds the scripts
2508     @type phase: str
2509     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2510         L{constants.HOOKS_PHASE_POST}
2511     @type env: dict
2512     @param env: dictionary with the environment for the hook
2513     @rtype: list
2514     @return: list of 3-element tuples:
2515       - script path
2516       - script result, either L{constants.HKR_SUCCESS} or
2517         L{constants.HKR_FAIL}
2518       - output of the script
2519
2520     @raise errors.ProgrammerError: for invalid input
2521         parameters
2522
2523     """
2524     if phase == constants.HOOKS_PHASE_PRE:
2525       suffix = "pre"
2526     elif phase == constants.HOOKS_PHASE_POST:
2527       suffix = "post"
2528     else:
2529       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2530     rr = []
2531
2532     subdir = "%s-%s.d" % (hpath, suffix)
2533     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2534     try:
2535       dir_contents = utils.ListVisibleFiles(dir_name)
2536     except OSError, err:
2537       # FIXME: must log output in case of failures
2538       return rr
2539
2540     # we use the standard python sort order,
2541     # so 00name is the recommended naming scheme
2542     dir_contents.sort()
2543     for relname in dir_contents:
2544       fname = os.path.join(dir_name, relname)
2545       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2546           self.RE_MASK.match(relname) is not None):
2547         rrval = constants.HKR_SKIP
2548         output = ""
2549       else:
2550         result, output = self.ExecHook(fname, env)
2551         if not result:
2552           rrval = constants.HKR_FAIL
2553         else:
2554           rrval = constants.HKR_SUCCESS
2555       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2556
2557     return rr
2558
2559
2560 class IAllocatorRunner(object):
2561   """IAllocator runner.
2562
2563   This class is instantiated on the node side (ganeti-noded) and not on
2564   the master side.
2565
2566   """
2567   def Run(self, name, idata):
2568     """Run an iallocator script.
2569
2570     @type name: str
2571     @param name: the iallocator script name
2572     @type idata: str
2573     @param idata: the allocator input data
2574
2575     @rtype: tuple
2576     @return: four element tuple of:
2577        - run status (one of the IARUN_ constants)
2578        - stdout
2579        - stderr
2580        - fail reason (as from L{utils.RunResult})
2581
2582     """
2583     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2584                                   os.path.isfile)
2585     if alloc_script is None:
2586       return (constants.IARUN_NOTFOUND, None, None, None)
2587
2588     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2589     try:
2590       os.write(fd, idata)
2591       os.close(fd)
2592       result = utils.RunCmd([alloc_script, fin_name])
2593       if result.failed:
2594         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2595                 result.fail_reason)
2596     finally:
2597       os.unlink(fin_name)
2598
2599     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2600
2601
2602 class DevCacheManager(object):
2603   """Simple class for managing a cache of block device information.
2604
2605   """
2606   _DEV_PREFIX = "/dev/"
2607   _ROOT_DIR = constants.BDEV_CACHE_DIR
2608
2609   @classmethod
2610   def _ConvertPath(cls, dev_path):
2611     """Converts a /dev/name path to the cache file name.
2612
2613     This replaces slashes with underscores and strips the /dev
2614     prefix. It then returns the full path to the cache file.
2615
2616     @type dev_path: str
2617     @param dev_path: the C{/dev/} path name
2618     @rtype: str
2619     @return: the converted path name
2620
2621     """
2622     if dev_path.startswith(cls._DEV_PREFIX):
2623       dev_path = dev_path[len(cls._DEV_PREFIX):]
2624     dev_path = dev_path.replace("/", "_")
2625     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2626     return fpath
2627
2628   @classmethod
2629   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2630     """Updates the cache information for a given device.
2631
2632     @type dev_path: str
2633     @param dev_path: the pathname of the device
2634     @type owner: str
2635     @param owner: the owner (instance name) of the device
2636     @type on_primary: bool
2637     @param on_primary: whether this is the primary
2638         node nor not
2639     @type iv_name: str
2640     @param iv_name: the instance-visible name of the
2641         device, as in objects.Disk.iv_name
2642
2643     @rtype: None
2644
2645     """
2646     if dev_path is None:
2647       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2648       return
2649     fpath = cls._ConvertPath(dev_path)
2650     if on_primary:
2651       state = "primary"
2652     else:
2653       state = "secondary"
2654     if iv_name is None:
2655       iv_name = "not_visible"
2656     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2657     try:
2658       utils.WriteFile(fpath, data=fdata)
2659     except EnvironmentError, err:
2660       logging.exception("Can't update bdev cache for %s", dev_path)
2661
2662   @classmethod
2663   def RemoveCache(cls, dev_path):
2664     """Remove data for a dev_path.
2665
2666     This is just a wrapper over L{utils.RemoveFile} with a converted
2667     path name and logging.
2668
2669     @type dev_path: str
2670     @param dev_path: the pathname of the device
2671
2672     @rtype: None
2673
2674     """
2675     if dev_path is None:
2676       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2677       return
2678     fpath = cls._ConvertPath(dev_path)
2679     try:
2680       utils.RemoveFile(fpath)
2681     except EnvironmentError, err:
2682       logging.exception("Can't update bdev cache for %s", dev_path)