Forward-port DrbdNetReconfig
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   return result
436
437
438 def GetVolumeList(vg_name):
439   """Compute list of logical volumes and their size.
440
441   @type vg_name: str
442   @param vg_name: the volume group whose LVs we should list
443   @rtype: dict
444   @return:
445       dictionary of all partions (key) with value being a tuple of
446       their size (in MiB), inactive and online status::
447
448         {'test1': ('20.06', True, True)}
449
450       in case of errors, a string is returned with the error
451       details.
452
453   """
454   lvs = {}
455   sep = '|'
456   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
457                          "--separator=%s" % sep,
458                          "-olv_name,lv_size,lv_attr", vg_name])
459   if result.failed:
460     logging.error("Failed to list logical volumes, lvs output: %s",
461                   result.output)
462     return result.output
463
464   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
465   for line in result.stdout.splitlines():
466     line = line.strip()
467     match = valid_line_re.match(line)
468     if not match:
469       logging.error("Invalid line returned from lvs output: '%s'", line)
470       continue
471     name, size, attr = match.groups()
472     inactive = attr[4] == '-'
473     online = attr[5] == 'o'
474     lvs[name] = (size, inactive, online)
475
476   return lvs
477
478
479 def ListVolumeGroups():
480   """List the volume groups and their size.
481
482   @rtype: dict
483   @return: dictionary with keys volume name and values the
484       size of the volume
485
486   """
487   return utils.ListVolumeGroups()
488
489
490 def NodeVolumes():
491   """List all volumes on this node.
492
493   @rtype: list
494   @return:
495     A list of dictionaries, each having four keys:
496       - name: the logical volume name,
497       - size: the size of the logical volume
498       - dev: the physical device on which the LV lives
499       - vg: the volume group to which it belongs
500
501     In case of errors, we return an empty list and log the
502     error.
503
504     Note that since a logical volume can live on multiple physical
505     volumes, the resulting list might include a logical volume
506     multiple times.
507
508   """
509   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
510                          "--separator=|",
511                          "--options=lv_name,lv_size,devices,vg_name"])
512   if result.failed:
513     logging.error("Failed to list logical volumes, lvs output: %s",
514                   result.output)
515     return []
516
517   def parse_dev(dev):
518     if '(' in dev:
519       return dev.split('(')[0]
520     else:
521       return dev
522
523   def map_line(line):
524     return {
525       'name': line[0].strip(),
526       'size': line[1].strip(),
527       'dev': parse_dev(line[2].strip()),
528       'vg': line[3].strip(),
529     }
530
531   return [map_line(line.split('|')) for line in result.stdout.splitlines()
532           if line.count('|') >= 3]
533
534
535 def BridgesExist(bridges_list):
536   """Check if a list of bridges exist on the current node.
537
538   @rtype: boolean
539   @return: C{True} if all of them exist, C{False} otherwise
540
541   """
542   for bridge in bridges_list:
543     if not utils.BridgeExists(bridge):
544       return False
545
546   return True
547
548
549 def GetInstanceList(hypervisor_list):
550   """Provides a list of instances.
551
552   @type hypervisor_list: list
553   @param hypervisor_list: the list of hypervisors to query information
554
555   @rtype: list
556   @return: a list of all running instances on the current node
557     - instance1.example.com
558     - instance2.example.com
559
560   """
561   results = []
562   for hname in hypervisor_list:
563     try:
564       names = hypervisor.GetHypervisor(hname).ListInstances()
565       results.extend(names)
566     except errors.HypervisorError, err:
567       logging.exception("Error enumerating instances for hypevisor %s", hname)
568       # FIXME: should we somehow not propagate this to the master?
569       raise
570
571   return results
572
573
574 def GetInstanceInfo(instance, hname):
575   """Gives back the informations about an instance as a dictionary.
576
577   @type instance: string
578   @param instance: the instance name
579   @type hname: string
580   @param hname: the hypervisor type of the instance
581
582   @rtype: dict
583   @return: dictionary with the following keys:
584       - memory: memory size of instance (int)
585       - state: xen state of instance (string)
586       - time: cpu time of instance (float)
587
588   """
589   output = {}
590
591   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
592   if iinfo is not None:
593     output['memory'] = iinfo[2]
594     output['state'] = iinfo[4]
595     output['time'] = iinfo[5]
596
597   return output
598
599
600 def GetInstanceMigratable(instance):
601   """Gives whether an instance can be migrated.
602
603   @type instance: L{objects.Instance}
604   @param instance: object representing the instance to be checked.
605
606   @rtype: tuple
607   @return: tuple of (result, description) where:
608       - result: whether the instance can be migrated or not
609       - description: a description of the issue, if relevant
610
611   """
612   hyper = hypervisor.GetHypervisor(instance.hypervisor)
613   if instance.name not in hyper.ListInstances():
614     return (False, 'not running')
615
616   for idx in range(len(instance.disks)):
617     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
618     if not os.path.islink(link_name):
619       return (False, 'not restarted since ganeti 1.2.5')
620
621   return (True, '')
622
623
624 def GetAllInstancesInfo(hypervisor_list):
625   """Gather data about all instances.
626
627   This is the equivalent of L{GetInstanceInfo}, except that it
628   computes data for all instances at once, thus being faster if one
629   needs data about more than one instance.
630
631   @type hypervisor_list: list
632   @param hypervisor_list: list of hypervisors to query for instance data
633
634   @rtype: dict
635   @return: dictionary of instance: data, with data having the following keys:
636       - memory: memory size of instance (int)
637       - state: xen state of instance (string)
638       - time: cpu time of instance (float)
639       - vcpus: the number of vcpus
640
641   """
642   output = {}
643
644   for hname in hypervisor_list:
645     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
646     if iinfo:
647       for name, inst_id, memory, vcpus, state, times in iinfo:
648         value = {
649           'memory': memory,
650           'vcpus': vcpus,
651           'state': state,
652           'time': times,
653           }
654         if name in output and output[name] != value:
655           raise errors.HypervisorError("Instance %s running duplicate"
656                                        " with different parameters" % name)
657         output[name] = value
658
659   return output
660
661
662 def AddOSToInstance(instance):
663   """Add an OS to an instance.
664
665   @type instance: L{objects.Instance}
666   @param instance: Instance whose OS is to be installed
667   @rtype: boolean
668   @return: the success of the operation
669
670   """
671   inst_os = OSFromDisk(instance.os)
672
673   create_env = OSEnvironment(instance)
674
675   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
676                                      instance.name, int(time.time()))
677
678   result = utils.RunCmd([inst_os.create_script], env=create_env,
679                         cwd=inst_os.path, output=logfile,)
680   if result.failed:
681     logging.error("os create command '%s' returned error: %s, logfile: %s,"
682                   " output: %s", result.cmd, result.fail_reason, logfile,
683                   result.output)
684     return False
685
686   return True
687
688
689 def RunRenameInstance(instance, old_name):
690   """Run the OS rename script for an instance.
691
692   @type instance: L{objects.Instance}
693   @param instance: Instance whose OS is to be installed
694   @type old_name: string
695   @param old_name: previous instance name
696   @rtype: boolean
697   @return: the success of the operation
698
699   """
700   inst_os = OSFromDisk(instance.os)
701
702   rename_env = OSEnvironment(instance)
703   rename_env['OLD_INSTANCE_NAME'] = old_name
704
705   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
706                                            old_name,
707                                            instance.name, int(time.time()))
708
709   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
710                         cwd=inst_os.path, output=logfile)
711
712   if result.failed:
713     logging.error("os create command '%s' returned error: %s output: %s",
714                   result.cmd, result.fail_reason, result.output)
715     return False
716
717   return True
718
719
720 def _GetVGInfo(vg_name):
721   """Get informations about the volume group.
722
723   @type vg_name: str
724   @param vg_name: the volume group which we query
725   @rtype: dict
726   @return:
727     A dictionary with the following keys:
728       - C{vg_size} is the total size of the volume group in MiB
729       - C{vg_free} is the free size of the volume group in MiB
730       - C{pv_count} are the number of physical disks in that VG
731
732     If an error occurs during gathering of data, we return the same dict
733     with keys all set to None.
734
735   """
736   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
737
738   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
739                          "--nosuffix", "--units=m", "--separator=:", vg_name])
740
741   if retval.failed:
742     logging.error("volume group %s not present", vg_name)
743     return retdic
744   valarr = retval.stdout.strip().rstrip(':').split(':')
745   if len(valarr) == 3:
746     try:
747       retdic = {
748         "vg_size": int(round(float(valarr[0]), 0)),
749         "vg_free": int(round(float(valarr[1]), 0)),
750         "pv_count": int(valarr[2]),
751         }
752     except ValueError, err:
753       logging.exception("Fail to parse vgs output")
754   else:
755     logging.error("vgs output has the wrong number of fields (expected"
756                   " three): %s", str(valarr))
757   return retdic
758
759
760 def _GetBlockDevSymlinkPath(instance_name, idx):
761   return os.path.join(constants.DISK_LINKS_DIR,
762                       "%s:%d" % (instance_name, idx))
763
764
765 def _SymlinkBlockDev(instance_name, device_path, idx):
766   """Set up symlinks to a instance's block device.
767
768   This is an auxiliary function run when an instance is start (on the primary
769   node) or when an instance is migrated (on the target node).
770
771
772   @param instance_name: the name of the target instance
773   @param device_path: path of the physical block device, on the node
774   @param idx: the disk index
775   @return: absolute path to the disk's symlink
776
777   """
778   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
779   try:
780     os.symlink(device_path, link_name)
781   except OSError, err:
782     if err.errno == errno.EEXIST:
783       if (not os.path.islink(link_name) or
784           os.readlink(link_name) != device_path):
785         os.remove(link_name)
786         os.symlink(device_path, link_name)
787     else:
788       raise
789
790   return link_name
791
792
793 def _RemoveBlockDevLinks(instance_name, disks):
794   """Remove the block device symlinks belonging to the given instance.
795
796   """
797   for idx, disk in enumerate(disks):
798     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
799     if os.path.islink(link_name):
800       try:
801         os.remove(link_name)
802       except OSError:
803         logging.exception("Can't remove symlink '%s'", link_name)
804
805
806 def _GatherAndLinkBlockDevs(instance):
807   """Set up an instance's block device(s).
808
809   This is run on the primary node at instance startup. The block
810   devices must be already assembled.
811
812   @type instance: L{objects.Instance}
813   @param instance: the instance whose disks we shoul assemble
814   @rtype: list
815   @return: list of (disk_object, device_path)
816
817   """
818   block_devices = []
819   for idx, disk in enumerate(instance.disks):
820     device = _RecursiveFindBD(disk)
821     if device is None:
822       raise errors.BlockDeviceError("Block device '%s' is not set up." %
823                                     str(disk))
824     device.Open()
825     try:
826       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
827     except OSError, e:
828       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
829                                     e.strerror)
830
831     block_devices.append((disk, link_name))
832
833   return block_devices
834
835
836 def StartInstance(instance, extra_args):
837   """Start an instance.
838
839   @type instance: L{objects.Instance}
840   @param instance: the instance object
841   @rtype: boolean
842   @return: whether the startup was successful or not
843
844   """
845   running_instances = GetInstanceList([instance.hypervisor])
846
847   if instance.name in running_instances:
848     return True
849
850   try:
851     block_devices = _GatherAndLinkBlockDevs(instance)
852     hyper = hypervisor.GetHypervisor(instance.hypervisor)
853     hyper.StartInstance(instance, block_devices, extra_args)
854   except errors.BlockDeviceError, err:
855     logging.exception("Failed to start instance")
856     return False
857   except errors.HypervisorError, err:
858     logging.exception("Failed to start instance")
859     _RemoveBlockDevLinks(instance.name, instance.disks)
860     return False
861
862   return True
863
864
865 def ShutdownInstance(instance):
866   """Shut an instance down.
867
868   @note: this functions uses polling with a hardcoded timeout.
869
870   @type instance: L{objects.Instance}
871   @param instance: the instance object
872   @rtype: boolean
873   @return: whether the startup was successful or not
874
875   """
876   hv_name = instance.hypervisor
877   running_instances = GetInstanceList([hv_name])
878
879   if instance.name not in running_instances:
880     return True
881
882   hyper = hypervisor.GetHypervisor(hv_name)
883   try:
884     hyper.StopInstance(instance)
885   except errors.HypervisorError, err:
886     logging.error("Failed to stop instance")
887     return False
888
889   # test every 10secs for 2min
890
891   time.sleep(1)
892   for dummy in range(11):
893     if instance.name not in GetInstanceList([hv_name]):
894       break
895     time.sleep(10)
896   else:
897     # the shutdown did not succeed
898     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
899
900     try:
901       hyper.StopInstance(instance, force=True)
902     except errors.HypervisorError, err:
903       logging.exception("Failed to stop instance")
904       return False
905
906     time.sleep(1)
907     if instance.name in GetInstanceList([hv_name]):
908       logging.error("could not shutdown instance '%s' even by destroy",
909                     instance.name)
910       return False
911
912   _RemoveBlockDevLinks(instance.name, instance.disks)
913
914   return True
915
916
917 def RebootInstance(instance, reboot_type, extra_args):
918   """Reboot an instance.
919
920   @type instance: L{objects.Instance}
921   @param instance: the instance object to reboot
922   @type reboot_type: str
923   @param reboot_type: the type of reboot, one the following
924     constants:
925       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
926         instance OS, do not recreate the VM
927       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
928         restart the VM (at the hypervisor level)
929       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
930         is not accepted here, since that mode is handled
931         differently
932   @rtype: boolean
933   @return: the success of the operation
934
935   """
936   running_instances = GetInstanceList([instance.hypervisor])
937
938   if instance.name not in running_instances:
939     logging.error("Cannot reboot instance that is not running")
940     return False
941
942   hyper = hypervisor.GetHypervisor(instance.hypervisor)
943   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
944     try:
945       hyper.RebootInstance(instance)
946     except errors.HypervisorError, err:
947       logging.exception("Failed to soft reboot instance")
948       return False
949   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
950     try:
951       ShutdownInstance(instance)
952       StartInstance(instance, extra_args)
953     except errors.HypervisorError, err:
954       logging.exception("Failed to hard reboot instance")
955       return False
956   else:
957     raise errors.ParameterError("reboot_type invalid")
958
959   return True
960
961
962 def MigrateInstance(instance, target, live):
963   """Migrates an instance to another node.
964
965   @type instance: L{objects.Instance}
966   @param instance: the instance definition
967   @type target: string
968   @param target: the target node name
969   @type live: boolean
970   @param live: whether the migration should be done live or not (the
971       interpretation of this parameter is left to the hypervisor)
972   @rtype: tuple
973   @return: a tuple of (success, msg) where:
974       - succes is a boolean denoting the success/failure of the operation
975       - msg is a string with details in case of failure
976
977   """
978   hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
979
980   try:
981     hyper.MigrateInstance(instance.name, target, live)
982   except errors.HypervisorError, err:
983     msg = "Failed to migrate instance: %s" % str(err)
984     logging.error(msg)
985     return (False, msg)
986   return (True, "Migration successfull")
987
988
989 def CreateBlockDevice(disk, size, owner, on_primary, info):
990   """Creates a block device for an instance.
991
992   @type disk: L{objects.Disk}
993   @param disk: the object describing the disk we should create
994   @type size: int
995   @param size: the size of the physical underlying device, in MiB
996   @type owner: str
997   @param owner: the name of the instance for which disk is created,
998       used for device cache data
999   @type on_primary: boolean
1000   @param on_primary:  indicates if it is the primary node or not
1001   @type info: string
1002   @param info: string that will be sent to the physical device
1003       creation, used for example to set (LVM) tags on LVs
1004
1005   @return: the new unique_id of the device (this can sometime be
1006       computed only after creation), or None. On secondary nodes,
1007       it's not required to return anything.
1008
1009   """
1010   clist = []
1011   if disk.children:
1012     for child in disk.children:
1013       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1014       if on_primary or disk.AssembleOnSecondary():
1015         # we need the children open in case the device itself has to
1016         # be assembled
1017         crdev.Open()
1018       clist.append(crdev)
1019   try:
1020     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
1021     if device is not None:
1022       logging.info("removing existing device %s", disk)
1023       device.Remove()
1024   except errors.BlockDeviceError, err:
1025     pass
1026
1027   device = bdev.Create(disk.dev_type, disk.physical_id,
1028                        clist, size)
1029   if device is None:
1030     raise ValueError("Can't create child device for %s, %s" %
1031                      (disk, size))
1032   if on_primary or disk.AssembleOnSecondary():
1033     if not device.Assemble():
1034       errorstring = "Can't assemble device after creation"
1035       logging.error(errorstring)
1036       raise errors.BlockDeviceError("%s, very unusual event - check the node"
1037                                     " daemon logs" % errorstring)
1038     device.SetSyncSpeed(constants.SYNC_SPEED)
1039     if on_primary or disk.OpenOnSecondary():
1040       device.Open(force=True)
1041     DevCacheManager.UpdateCache(device.dev_path, owner,
1042                                 on_primary, disk.iv_name)
1043
1044   device.SetInfo(info)
1045
1046   physical_id = device.unique_id
1047   return physical_id
1048
1049
1050 def RemoveBlockDevice(disk):
1051   """Remove a block device.
1052
1053   @note: This is intended to be called recursively.
1054
1055   @type disk: L{objects.Disk}
1056   @param disk: the disk object we should remove
1057   @rtype: boolean
1058   @return: the success of the operation
1059
1060   """
1061   try:
1062     rdev = _RecursiveFindBD(disk)
1063   except errors.BlockDeviceError, err:
1064     # probably can't attach
1065     logging.info("Can't attach to device %s in remove", disk)
1066     rdev = None
1067   if rdev is not None:
1068     r_path = rdev.dev_path
1069     result = rdev.Remove()
1070     if result:
1071       DevCacheManager.RemoveCache(r_path)
1072   else:
1073     result = True
1074   if disk.children:
1075     for child in disk.children:
1076       result = result and RemoveBlockDevice(child)
1077   return result
1078
1079
1080 def _RecursiveAssembleBD(disk, owner, as_primary):
1081   """Activate a block device for an instance.
1082
1083   This is run on the primary and secondary nodes for an instance.
1084
1085   @note: this function is called recursively.
1086
1087   @type disk: L{objects.Disk}
1088   @param disk: the disk we try to assemble
1089   @type owner: str
1090   @param owner: the name of the instance which owns the disk
1091   @type as_primary: boolean
1092   @param as_primary: if we should make the block device
1093       read/write
1094
1095   @return: the assembled device or None (in case no device
1096       was assembled)
1097   @raise errors.BlockDeviceError: in case there is an error
1098       during the activation of the children or the device
1099       itself
1100
1101   """
1102   children = []
1103   if disk.children:
1104     mcn = disk.ChildrenNeeded()
1105     if mcn == -1:
1106       mcn = 0 # max number of Nones allowed
1107     else:
1108       mcn = len(disk.children) - mcn # max number of Nones
1109     for chld_disk in disk.children:
1110       try:
1111         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1112       except errors.BlockDeviceError, err:
1113         if children.count(None) >= mcn:
1114           raise
1115         cdev = None
1116         logging.debug("Error in child activation: %s", str(err))
1117       children.append(cdev)
1118
1119   if as_primary or disk.AssembleOnSecondary():
1120     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1121     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1122     result = r_dev
1123     if as_primary or disk.OpenOnSecondary():
1124       r_dev.Open()
1125     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1126                                 as_primary, disk.iv_name)
1127
1128   else:
1129     result = True
1130   return result
1131
1132
1133 def AssembleBlockDevice(disk, owner, as_primary):
1134   """Activate a block device for an instance.
1135
1136   This is a wrapper over _RecursiveAssembleBD.
1137
1138   @rtype: str or boolean
1139   @return: a C{/dev/...} path for primary nodes, and
1140       C{True} for secondary nodes
1141
1142   """
1143   result = _RecursiveAssembleBD(disk, owner, as_primary)
1144   if isinstance(result, bdev.BlockDev):
1145     result = result.dev_path
1146   return result
1147
1148
1149 def ShutdownBlockDevice(disk):
1150   """Shut down a block device.
1151
1152   First, if the device is assembled (Attach() is successfull), then
1153   the device is shutdown. Then the children of the device are
1154   shutdown.
1155
1156   This function is called recursively. Note that we don't cache the
1157   children or such, as oppossed to assemble, shutdown of different
1158   devices doesn't require that the upper device was active.
1159
1160   @type disk: L{objects.Disk}
1161   @param disk: the description of the disk we should
1162       shutdown
1163   @rtype: boolean
1164   @return: the success of the operation
1165
1166   """
1167   r_dev = _RecursiveFindBD(disk)
1168   if r_dev is not None:
1169     r_path = r_dev.dev_path
1170     result = r_dev.Shutdown()
1171     if result:
1172       DevCacheManager.RemoveCache(r_path)
1173   else:
1174     result = True
1175   if disk.children:
1176     for child in disk.children:
1177       result = result and ShutdownBlockDevice(child)
1178   return result
1179
1180
1181 def MirrorAddChildren(parent_cdev, new_cdevs):
1182   """Extend a mirrored block device.
1183
1184   @type parent_cdev: L{objects.Disk}
1185   @param parent_cdev: the disk to which we should add children
1186   @type new_cdevs: list of L{objects.Disk}
1187   @param new_cdevs: the list of children which we should add
1188   @rtype: boolean
1189   @return: the success of the operation
1190
1191   """
1192   parent_bdev = _RecursiveFindBD(parent_cdev)
1193   if parent_bdev is None:
1194     logging.error("Can't find parent device")
1195     return False
1196   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1197   if new_bdevs.count(None) > 0:
1198     logging.error("Can't find new device(s) to add: %s:%s",
1199                   new_bdevs, new_cdevs)
1200     return False
1201   parent_bdev.AddChildren(new_bdevs)
1202   return True
1203
1204
1205 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1206   """Shrink a mirrored block device.
1207
1208   @type parent_cdev: L{objects.Disk}
1209   @param parent_cdev: the disk from which we should remove children
1210   @type new_cdevs: list of L{objects.Disk}
1211   @param new_cdevs: the list of children which we should remove
1212   @rtype: boolean
1213   @return: the success of the operation
1214
1215   """
1216   parent_bdev = _RecursiveFindBD(parent_cdev)
1217   if parent_bdev is None:
1218     logging.error("Can't find parent in remove children: %s", parent_cdev)
1219     return False
1220   devs = []
1221   for disk in new_cdevs:
1222     rpath = disk.StaticDevPath()
1223     if rpath is None:
1224       bd = _RecursiveFindBD(disk)
1225       if bd is None:
1226         logging.error("Can't find dynamic device %s while removing children",
1227                       disk)
1228         return False
1229       else:
1230         devs.append(bd.dev_path)
1231     else:
1232       devs.append(rpath)
1233   parent_bdev.RemoveChildren(devs)
1234   return True
1235
1236
1237 def GetMirrorStatus(disks):
1238   """Get the mirroring status of a list of devices.
1239
1240   @type disks: list of L{objects.Disk}
1241   @param disks: the list of disks which we should query
1242   @rtype: disk
1243   @return:
1244       a list of (mirror_done, estimated_time) tuples, which
1245       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1246   @raise errors.BlockDeviceError: if any of the disks cannot be
1247       found
1248
1249   """
1250   stats = []
1251   for dsk in disks:
1252     rbd = _RecursiveFindBD(dsk)
1253     if rbd is None:
1254       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1255     stats.append(rbd.CombinedSyncStatus())
1256   return stats
1257
1258
1259 def _RecursiveFindBD(disk):
1260   """Check if a device is activated.
1261
1262   If so, return informations about the real device.
1263
1264   @type disk: L{objects.Disk}
1265   @param disk: the disk object we need to find
1266
1267   @return: None if the device can't be found,
1268       otherwise the device instance
1269
1270   """
1271   children = []
1272   if disk.children:
1273     for chdisk in disk.children:
1274       children.append(_RecursiveFindBD(chdisk))
1275
1276   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1277
1278
1279 def FindBlockDevice(disk):
1280   """Check if a device is activated.
1281
1282   If it is, return informations about the real device.
1283
1284   @type disk: L{objects.Disk}
1285   @param disk: the disk to find
1286   @rtype: None or tuple
1287   @return: None if the disk cannot be found, otherwise a
1288       tuple (device_path, major, minor, sync_percent,
1289       estimated_time, is_degraded)
1290
1291   """
1292   rbd = _RecursiveFindBD(disk)
1293   if rbd is None:
1294     return rbd
1295   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1296
1297
1298 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1299   """Write a file to the filesystem.
1300
1301   This allows the master to overwrite(!) a file. It will only perform
1302   the operation if the file belongs to a list of configuration files.
1303
1304   @type file_name: str
1305   @param file_name: the target file name
1306   @type data: str
1307   @param data: the new contents of the file
1308   @type mode: int
1309   @param mode: the mode to give the file (can be None)
1310   @type uid: int
1311   @param uid: the owner of the file (can be -1 for default)
1312   @type gid: int
1313   @param gid: the group of the file (can be -1 for default)
1314   @type atime: float
1315   @param atime: the atime to set on the file (can be None)
1316   @type mtime: float
1317   @param mtime: the mtime to set on the file (can be None)
1318   @rtype: boolean
1319   @return: the success of the operation; errors are logged
1320       in the node daemon log
1321
1322   """
1323   if not os.path.isabs(file_name):
1324     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1325                   file_name)
1326     return False
1327
1328   allowed_files = [
1329     constants.CLUSTER_CONF_FILE,
1330     constants.ETC_HOSTS,
1331     constants.SSH_KNOWN_HOSTS_FILE,
1332     constants.VNC_PASSWORD_FILE,
1333     ]
1334
1335   if file_name not in allowed_files:
1336     logging.error("Filename passed to UploadFile not in allowed"
1337                  " upload targets: '%s'", file_name)
1338     return False
1339
1340   raw_data = _Decompress(data)
1341
1342   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1343                   atime=atime, mtime=mtime)
1344   return True
1345
1346
1347 def WriteSsconfFiles(values):
1348   """Update all ssconf files.
1349
1350   Wrapper around the SimpleStore.WriteFiles.
1351
1352   """
1353   ssconf.SimpleStore().WriteFiles(values)
1354
1355
1356 def _ErrnoOrStr(err):
1357   """Format an EnvironmentError exception.
1358
1359   If the L{err} argument has an errno attribute, it will be looked up
1360   and converted into a textual C{E...} description. Otherwise the
1361   string representation of the error will be returned.
1362
1363   @type err: L{EnvironmentError}
1364   @param err: the exception to format
1365
1366   """
1367   if hasattr(err, 'errno'):
1368     detail = errno.errorcode[err.errno]
1369   else:
1370     detail = str(err)
1371   return detail
1372
1373
1374 def _OSOndiskVersion(name, os_dir):
1375   """Compute and return the API version of a given OS.
1376
1377   This function will try to read the API version of the OS given by
1378   the 'name' parameter and residing in the 'os_dir' directory.
1379
1380   @type name: str
1381   @param name: the OS name we should look for
1382   @type os_dir: str
1383   @param os_dir: the directory inwhich we should look for the OS
1384   @rtype: int or None
1385   @return:
1386       Either an integer denoting the version or None in the
1387       case when this is not a valid OS name.
1388   @raise errors.InvalidOS: if the OS cannot be found
1389
1390   """
1391   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1392
1393   try:
1394     st = os.stat(api_file)
1395   except EnvironmentError, err:
1396     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1397                            " found (%s)" % _ErrnoOrStr(err))
1398
1399   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1400     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1401                            " a regular file")
1402
1403   try:
1404     f = open(api_file)
1405     try:
1406       api_versions = f.readlines()
1407     finally:
1408       f.close()
1409   except EnvironmentError, err:
1410     raise errors.InvalidOS(name, os_dir, "error while reading the"
1411                            " API version (%s)" % _ErrnoOrStr(err))
1412
1413   api_versions = [version.strip() for version in api_versions]
1414   try:
1415     api_versions = [int(version) for version in api_versions]
1416   except (TypeError, ValueError), err:
1417     raise errors.InvalidOS(name, os_dir,
1418                            "API version is not integer (%s)" % str(err))
1419
1420   return api_versions
1421
1422
1423 def DiagnoseOS(top_dirs=None):
1424   """Compute the validity for all OSes.
1425
1426   @type top_dirs: list
1427   @param top_dirs: the list of directories in which to
1428       search (if not given defaults to
1429       L{constants.OS_SEARCH_PATH})
1430   @rtype: list of L{objects.OS}
1431   @return: an OS object for each name in all the given
1432       directories
1433
1434   """
1435   if top_dirs is None:
1436     top_dirs = constants.OS_SEARCH_PATH
1437
1438   result = []
1439   for dir_name in top_dirs:
1440     if os.path.isdir(dir_name):
1441       try:
1442         f_names = utils.ListVisibleFiles(dir_name)
1443       except EnvironmentError, err:
1444         logging.exception("Can't list the OS directory %s", dir_name)
1445         break
1446       for name in f_names:
1447         try:
1448           os_inst = OSFromDisk(name, base_dir=dir_name)
1449           result.append(os_inst)
1450         except errors.InvalidOS, err:
1451           result.append(objects.OS.FromInvalidOS(err))
1452
1453   return result
1454
1455
1456 def OSFromDisk(name, base_dir=None):
1457   """Create an OS instance from disk.
1458
1459   This function will return an OS instance if the given name is a
1460   valid OS name. Otherwise, it will raise an appropriate
1461   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1462
1463   @type base_dir: string
1464   @keyword base_dir: Base directory containing OS installations.
1465                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1466   @rtype: L{objects.OS}
1467   @return: the OS instance if we find a valid one
1468   @raise errors.InvalidOS: if we don't find a valid OS
1469
1470   """
1471   if base_dir is None:
1472     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1473     if os_dir is None:
1474       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1475   else:
1476     os_dir = os.path.sep.join([base_dir, name])
1477
1478   api_versions = _OSOndiskVersion(name, os_dir)
1479
1480   if constants.OS_API_VERSION not in api_versions:
1481     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1482                            " (found %s want %s)"
1483                            % (api_versions, constants.OS_API_VERSION))
1484
1485   # OS Scripts dictionary, we will populate it with the actual script names
1486   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1487
1488   for script in os_scripts:
1489     os_scripts[script] = os.path.sep.join([os_dir, script])
1490
1491     try:
1492       st = os.stat(os_scripts[script])
1493     except EnvironmentError, err:
1494       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1495                              (script, _ErrnoOrStr(err)))
1496
1497     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1498       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1499                              script)
1500
1501     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1502       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1503                              script)
1504
1505
1506   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1507                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1508                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1509                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1510                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1511                     api_versions=api_versions)
1512
1513 def OSEnvironment(instance, debug=0):
1514   """Calculate the environment for an os script.
1515
1516   @type instance: L{objects.Instance}
1517   @param instance: target instance for the os script run
1518   @type debug: integer
1519   @param debug: debug level (0 or 1, for OS Api 10)
1520   @rtype: dict
1521   @return: dict of environment variables
1522   @raise errors.BlockDeviceError: if the block device
1523       cannot be found
1524
1525   """
1526   result = {}
1527   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1528   result['INSTANCE_NAME'] = instance.name
1529   result['HYPERVISOR'] = instance.hypervisor
1530   result['DISK_COUNT'] = '%d' % len(instance.disks)
1531   result['NIC_COUNT'] = '%d' % len(instance.nics)
1532   result['DEBUG_LEVEL'] = '%d' % debug
1533   for idx, disk in enumerate(instance.disks):
1534     real_disk = _RecursiveFindBD(disk)
1535     if real_disk is None:
1536       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1537                                     str(disk))
1538     real_disk.Open()
1539     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1540     # FIXME: When disks will have read-only mode, populate this
1541     result['DISK_%d_ACCESS' % idx] = 'W'
1542     if constants.HV_DISK_TYPE in instance.hvparams:
1543       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1544         instance.hvparams[constants.HV_DISK_TYPE]
1545     if disk.dev_type in constants.LDS_BLOCK:
1546       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1547     elif disk.dev_type == constants.LD_FILE:
1548       result['DISK_%d_BACKEND_TYPE' % idx] = \
1549         'file:%s' % disk.physical_id[0]
1550   for idx, nic in enumerate(instance.nics):
1551     result['NIC_%d_MAC' % idx] = nic.mac
1552     if nic.ip:
1553       result['NIC_%d_IP' % idx] = nic.ip
1554     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1555     if constants.HV_NIC_TYPE in instance.hvparams:
1556       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1557         instance.hvparams[constants.HV_NIC_TYPE]
1558
1559   return result
1560
1561 def GrowBlockDevice(disk, amount):
1562   """Grow a stack of block devices.
1563
1564   This function is called recursively, with the childrens being the
1565   first ones to resize.
1566
1567   @type disk: L{objects.Disk}
1568   @param disk: the disk to be grown
1569   @rtype: (status, result)
1570   @return: a tuple with the status of the operation
1571       (True/False), and the errors message if status
1572       is False
1573
1574   """
1575   r_dev = _RecursiveFindBD(disk)
1576   if r_dev is None:
1577     return False, "Cannot find block device %s" % (disk,)
1578
1579   try:
1580     r_dev.Grow(amount)
1581   except errors.BlockDeviceError, err:
1582     return False, str(err)
1583
1584   return True, None
1585
1586
1587 def SnapshotBlockDevice(disk):
1588   """Create a snapshot copy of a block device.
1589
1590   This function is called recursively, and the snapshot is actually created
1591   just for the leaf lvm backend device.
1592
1593   @type disk: L{objects.Disk}
1594   @param disk: the disk to be snapshotted
1595   @rtype: string
1596   @return: snapshot disk path
1597
1598   """
1599   if disk.children:
1600     if len(disk.children) == 1:
1601       # only one child, let's recurse on it
1602       return SnapshotBlockDevice(disk.children[0])
1603     else:
1604       # more than one child, choose one that matches
1605       for child in disk.children:
1606         if child.size == disk.size:
1607           # return implies breaking the loop
1608           return SnapshotBlockDevice(child)
1609   elif disk.dev_type == constants.LD_LV:
1610     r_dev = _RecursiveFindBD(disk)
1611     if r_dev is not None:
1612       # let's stay on the safe side and ask for the full size, for now
1613       return r_dev.Snapshot(disk.size)
1614     else:
1615       return None
1616   else:
1617     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1618                                  " '%s' of type '%s'" %
1619                                  (disk.unique_id, disk.dev_type))
1620
1621
1622 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1623   """Export a block device snapshot to a remote node.
1624
1625   @type disk: L{objects.Disk}
1626   @param disk: the description of the disk to export
1627   @type dest_node: str
1628   @param dest_node: the destination node to export to
1629   @type instance: L{objects.Instance}
1630   @param instance: the instance object to whom the disk belongs
1631   @type cluster_name: str
1632   @param cluster_name: the cluster name, needed for SSH hostalias
1633   @type idx: int
1634   @param idx: the index of the disk in the instance's disk list,
1635       used to export to the OS scripts environment
1636   @rtype: boolean
1637   @return: the success of the operation
1638
1639   """
1640   export_env = OSEnvironment(instance)
1641
1642   inst_os = OSFromDisk(instance.os)
1643   export_script = inst_os.export_script
1644
1645   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1646                                      instance.name, int(time.time()))
1647   if not os.path.exists(constants.LOG_OS_DIR):
1648     os.mkdir(constants.LOG_OS_DIR, 0750)
1649   real_disk = _RecursiveFindBD(disk)
1650   if real_disk is None:
1651     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1652                                   str(disk))
1653   real_disk.Open()
1654
1655   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1656   export_env['EXPORT_INDEX'] = str(idx)
1657
1658   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1659   destfile = disk.physical_id[1]
1660
1661   # the target command is built out of three individual commands,
1662   # which are joined by pipes; we check each individual command for
1663   # valid parameters
1664   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1665                                export_script, logfile)
1666
1667   comprcmd = "gzip"
1668
1669   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1670                                 destdir, destdir, destfile)
1671   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1672                                                    constants.GANETI_RUNAS,
1673                                                    destcmd)
1674
1675   # all commands have been checked, so we're safe to combine them
1676   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1677
1678   result = utils.RunCmd(command, env=export_env)
1679
1680   if result.failed:
1681     logging.error("os snapshot export command '%s' returned error: %s"
1682                   " output: %s", command, result.fail_reason, result.output)
1683     return False
1684
1685   return True
1686
1687
1688 def FinalizeExport(instance, snap_disks):
1689   """Write out the export configuration information.
1690
1691   @type instance: L{objects.Instance}
1692   @param instance: the instance which we export, used for
1693       saving configuration
1694   @type snap_disks: list of L{objects.Disk}
1695   @param snap_disks: list of snapshot block devices, which
1696       will be used to get the actual name of the dump file
1697
1698   @rtype: boolean
1699   @return: the success of the operation
1700
1701   """
1702   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1703   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1704
1705   config = objects.SerializableConfigParser()
1706
1707   config.add_section(constants.INISECT_EXP)
1708   config.set(constants.INISECT_EXP, 'version', '0')
1709   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1710   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1711   config.set(constants.INISECT_EXP, 'os', instance.os)
1712   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1713
1714   config.add_section(constants.INISECT_INS)
1715   config.set(constants.INISECT_INS, 'name', instance.name)
1716   config.set(constants.INISECT_INS, 'memory', '%d' %
1717              instance.beparams[constants.BE_MEMORY])
1718   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1719              instance.beparams[constants.BE_VCPUS])
1720   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1721
1722   nic_total = 0
1723   for nic_count, nic in enumerate(instance.nics):
1724     nic_total += 1
1725     config.set(constants.INISECT_INS, 'nic%d_mac' %
1726                nic_count, '%s' % nic.mac)
1727     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1728     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1729                '%s' % nic.bridge)
1730   # TODO: redundant: on load can read nics until it doesn't exist
1731   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1732
1733   disk_total = 0
1734   for disk_count, disk in enumerate(snap_disks):
1735     if disk:
1736       disk_total += 1
1737       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1738                  ('%s' % disk.iv_name))
1739       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1740                  ('%s' % disk.physical_id[1]))
1741       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1742                  ('%d' % disk.size))
1743
1744   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1745
1746   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1747                   data=config.Dumps())
1748   shutil.rmtree(finaldestdir, True)
1749   shutil.move(destdir, finaldestdir)
1750
1751   return True
1752
1753
1754 def ExportInfo(dest):
1755   """Get export configuration information.
1756
1757   @type dest: str
1758   @param dest: directory containing the export
1759
1760   @rtype: L{objects.SerializableConfigParser}
1761   @return: a serializable config file containing the
1762       export info
1763
1764   """
1765   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1766
1767   config = objects.SerializableConfigParser()
1768   config.read(cff)
1769
1770   if (not config.has_section(constants.INISECT_EXP) or
1771       not config.has_section(constants.INISECT_INS)):
1772     return None
1773
1774   return config
1775
1776
1777 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1778   """Import an os image into an instance.
1779
1780   @type instance: L{objects.Instance}
1781   @param instance: instance to import the disks into
1782   @type src_node: string
1783   @param src_node: source node for the disk images
1784   @type src_images: list of string
1785   @param src_images: absolute paths of the disk images
1786   @rtype: list of boolean
1787   @return: each boolean represent the success of importing the n-th disk
1788
1789   """
1790   import_env = OSEnvironment(instance)
1791   inst_os = OSFromDisk(instance.os)
1792   import_script = inst_os.import_script
1793
1794   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1795                                         instance.name, int(time.time()))
1796   if not os.path.exists(constants.LOG_OS_DIR):
1797     os.mkdir(constants.LOG_OS_DIR, 0750)
1798
1799   comprcmd = "gunzip"
1800   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1801                                import_script, logfile)
1802
1803   final_result = []
1804   for idx, image in enumerate(src_images):
1805     if image:
1806       destcmd = utils.BuildShellCmd('cat %s', image)
1807       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1808                                                        constants.GANETI_RUNAS,
1809                                                        destcmd)
1810       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1811       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1812       import_env['IMPORT_INDEX'] = str(idx)
1813       result = utils.RunCmd(command, env=import_env)
1814       if result.failed:
1815         logging.error("Disk import command '%s' returned error: %s"
1816                       " output: %s", command, result.fail_reason,
1817                       result.output)
1818         final_result.append(False)
1819       else:
1820         final_result.append(True)
1821     else:
1822       final_result.append(True)
1823
1824   return final_result
1825
1826
1827 def ListExports():
1828   """Return a list of exports currently available on this machine.
1829
1830   @rtype: list
1831   @return: list of the exports
1832
1833   """
1834   if os.path.isdir(constants.EXPORT_DIR):
1835     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1836   else:
1837     return []
1838
1839
1840 def RemoveExport(export):
1841   """Remove an existing export from the node.
1842
1843   @type export: str
1844   @param export: the name of the export to remove
1845   @rtype: boolean
1846   @return: the success of the operation
1847
1848   """
1849   target = os.path.join(constants.EXPORT_DIR, export)
1850
1851   shutil.rmtree(target)
1852   # TODO: catch some of the relevant exceptions and provide a pretty
1853   # error message if rmtree fails.
1854
1855   return True
1856
1857
1858 def RenameBlockDevices(devlist):
1859   """Rename a list of block devices.
1860
1861   @type devlist: list of tuples
1862   @param devlist: list of tuples of the form  (disk,
1863       new_logical_id, new_physical_id); disk is an
1864       L{objects.Disk} object describing the current disk,
1865       and new logical_id/physical_id is the name we
1866       rename it to
1867   @rtype: boolean
1868   @return: True if all renames succeeded, False otherwise
1869
1870   """
1871   result = True
1872   for disk, unique_id in devlist:
1873     dev = _RecursiveFindBD(disk)
1874     if dev is None:
1875       result = False
1876       continue
1877     try:
1878       old_rpath = dev.dev_path
1879       dev.Rename(unique_id)
1880       new_rpath = dev.dev_path
1881       if old_rpath != new_rpath:
1882         DevCacheManager.RemoveCache(old_rpath)
1883         # FIXME: we should add the new cache information here, like:
1884         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1885         # but we don't have the owner here - maybe parse from existing
1886         # cache? for now, we only lose lvm data when we rename, which
1887         # is less critical than DRBD or MD
1888     except errors.BlockDeviceError, err:
1889       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1890       result = False
1891   return result
1892
1893
1894 def _TransformFileStorageDir(file_storage_dir):
1895   """Checks whether given file_storage_dir is valid.
1896
1897   Checks wheter the given file_storage_dir is within the cluster-wide
1898   default file_storage_dir stored in SimpleStore. Only paths under that
1899   directory are allowed.
1900
1901   @type file_storage_dir: str
1902   @param file_storage_dir: the path to check
1903
1904   @return: the normalized path if valid, None otherwise
1905
1906   """
1907   cfg = _GetConfig()
1908   file_storage_dir = os.path.normpath(file_storage_dir)
1909   base_file_storage_dir = cfg.GetFileStorageDir()
1910   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1911       base_file_storage_dir):
1912     logging.error("file storage directory '%s' is not under base file"
1913                   " storage directory '%s'",
1914                   file_storage_dir, base_file_storage_dir)
1915     return None
1916   return file_storage_dir
1917
1918
1919 def CreateFileStorageDir(file_storage_dir):
1920   """Create file storage directory.
1921
1922   @type file_storage_dir: str
1923   @param file_storage_dir: directory to create
1924
1925   @rtype: tuple
1926   @return: tuple with first element a boolean indicating wheter dir
1927       creation was successful or not
1928
1929   """
1930   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1931   result = True,
1932   if not file_storage_dir:
1933     result = False,
1934   else:
1935     if os.path.exists(file_storage_dir):
1936       if not os.path.isdir(file_storage_dir):
1937         logging.error("'%s' is not a directory", file_storage_dir)
1938         result = False,
1939     else:
1940       try:
1941         os.makedirs(file_storage_dir, 0750)
1942       except OSError, err:
1943         logging.error("Cannot create file storage directory '%s': %s",
1944                       file_storage_dir, err)
1945         result = False,
1946   return result
1947
1948
1949 def RemoveFileStorageDir(file_storage_dir):
1950   """Remove file storage directory.
1951
1952   Remove it only if it's empty. If not log an error and return.
1953
1954   @type file_storage_dir: str
1955   @param file_storage_dir: the directory we should cleanup
1956   @rtype: tuple (success,)
1957   @return: tuple of one element, C{success}, denoting
1958       whether the operation was successfull
1959
1960   """
1961   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1962   result = True,
1963   if not file_storage_dir:
1964     result = False,
1965   else:
1966     if os.path.exists(file_storage_dir):
1967       if not os.path.isdir(file_storage_dir):
1968         logging.error("'%s' is not a directory", file_storage_dir)
1969         result = False,
1970       # deletes dir only if empty, otherwise we want to return False
1971       try:
1972         os.rmdir(file_storage_dir)
1973       except OSError, err:
1974         logging.exception("Cannot remove file storage directory '%s'",
1975                           file_storage_dir)
1976         result = False,
1977   return result
1978
1979
1980 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1981   """Rename the file storage directory.
1982
1983   @type old_file_storage_dir: str
1984   @param old_file_storage_dir: the current path
1985   @type new_file_storage_dir: str
1986   @param new_file_storage_dir: the name we should rename to
1987   @rtype: tuple (success,)
1988   @return: tuple of one element, C{success}, denoting
1989       whether the operation was successful
1990
1991   """
1992   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1993   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1994   result = True,
1995   if not old_file_storage_dir or not new_file_storage_dir:
1996     result = False,
1997   else:
1998     if not os.path.exists(new_file_storage_dir):
1999       if os.path.isdir(old_file_storage_dir):
2000         try:
2001           os.rename(old_file_storage_dir, new_file_storage_dir)
2002         except OSError, err:
2003           logging.exception("Cannot rename '%s' to '%s'",
2004                             old_file_storage_dir, new_file_storage_dir)
2005           result =  False,
2006       else:
2007         logging.error("'%s' is not a directory", old_file_storage_dir)
2008         result = False,
2009     else:
2010       if os.path.exists(old_file_storage_dir):
2011         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2012                       old_file_storage_dir, new_file_storage_dir)
2013         result = False,
2014   return result
2015
2016
2017 def _IsJobQueueFile(file_name):
2018   """Checks whether the given filename is in the queue directory.
2019
2020   @type file_name: str
2021   @param file_name: the file name we should check
2022   @rtype: boolean
2023   @return: whether the file is under the queue directory
2024
2025   """
2026   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2027   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2028
2029   if not result:
2030     logging.error("'%s' is not a file in the queue directory",
2031                   file_name)
2032
2033   return result
2034
2035
2036 def JobQueueUpdate(file_name, content):
2037   """Updates a file in the queue directory.
2038
2039   This is just a wrapper over L{utils.WriteFile}, with proper
2040   checking.
2041
2042   @type file_name: str
2043   @param file_name: the job file name
2044   @type content: str
2045   @param content: the new job contents
2046   @rtype: boolean
2047   @return: the success of the operation
2048
2049   """
2050   if not _IsJobQueueFile(file_name):
2051     return False
2052
2053   # Write and replace the file atomically
2054   utils.WriteFile(file_name, data=_Decompress(content))
2055
2056   return True
2057
2058
2059 def JobQueueRename(old, new):
2060   """Renames a job queue file.
2061
2062   This is just a wrapper over os.rename with proper checking.
2063
2064   @type old: str
2065   @param old: the old (actual) file name
2066   @type new: str
2067   @param new: the desired file name
2068   @rtype: boolean
2069   @return: the success of the operation
2070
2071   """
2072   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2073     return False
2074
2075   utils.RenameFile(old, new, mkdir=True)
2076
2077   return True
2078
2079
2080 def JobQueueSetDrainFlag(drain_flag):
2081   """Set the drain flag for the queue.
2082
2083   This will set or unset the queue drain flag.
2084
2085   @type drain_flag: boolean
2086   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2087   @rtype: boolean
2088   @return: always True
2089   @warning: the function always returns True
2090
2091   """
2092   if drain_flag:
2093     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2094   else:
2095     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2096
2097   return True
2098
2099
2100 def CloseBlockDevices(instance_name, disks):
2101   """Closes the given block devices.
2102
2103   This means they will be switched to secondary mode (in case of
2104   DRBD).
2105
2106   @param instance_name: if the argument is not empty, the symlinks
2107       of this instance will be removed
2108   @type disks: list of L{objects.Disk}
2109   @param disks: the list of disks to be closed
2110   @rtype: tuple (success, message)
2111   @return: a tuple of success and message, where success
2112       indicates the succes of the operation, and message
2113       which will contain the error details in case we
2114       failed
2115
2116   """
2117   bdevs = []
2118   for cf in disks:
2119     rd = _RecursiveFindBD(cf)
2120     if rd is None:
2121       return (False, "Can't find device %s" % cf)
2122     bdevs.append(rd)
2123
2124   msg = []
2125   for rd in bdevs:
2126     try:
2127       rd.Close()
2128     except errors.BlockDeviceError, err:
2129       msg.append(str(err))
2130   if msg:
2131     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2132   else:
2133     if instance_name:
2134       _RemoveBlockDevLinks(instance_name, disks)
2135     return (True, "All devices secondary")
2136
2137
2138 def ValidateHVParams(hvname, hvparams):
2139   """Validates the given hypervisor parameters.
2140
2141   @type hvname: string
2142   @param hvname: the hypervisor name
2143   @type hvparams: dict
2144   @param hvparams: the hypervisor parameters to be validated
2145   @rtype: tuple (success, message)
2146   @return: a tuple of success and message, where success
2147       indicates the succes of the operation, and message
2148       which will contain the error details in case we
2149       failed
2150
2151   """
2152   try:
2153     hv_type = hypervisor.GetHypervisor(hvname)
2154     hv_type.ValidateParameters(hvparams)
2155     return (True, "Validation passed")
2156   except errors.HypervisorError, err:
2157     return (False, str(err))
2158
2159
2160 def DemoteFromMC():
2161   """Demotes the current node from master candidate role.
2162
2163   """
2164   # try to ensure we're not the master by mistake
2165   master, myself = ssconf.GetMasterAndMyself()
2166   if master == myself:
2167     return (False, "ssconf status shows I'm the master node, will not demote")
2168   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2169   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2170     return (False, "The master daemon is running, will not demote")
2171   try:
2172     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2173   except EnvironmentError, err:
2174     if err.errno != errno.ENOENT:
2175       return (False, "Error while backing up cluster file: %s" % str(err))
2176   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2177   return (True, "Done")
2178
2179
2180 def _FindDisks(nodes_ip, disks):
2181   """Sets the physical ID on disks and returns the block devices.
2182
2183   """
2184   # set the correct physical ID
2185   my_name = utils.HostInfo().name
2186   for cf in disks:
2187     cf.SetPhysicalID(my_name, nodes_ip)
2188
2189   bdevs = []
2190
2191   for cf in disks:
2192     rd = _RecursiveFindBD(cf)
2193     if rd is None:
2194       return (False, "Can't find device %s" % cf)
2195     bdevs.append(rd)
2196   return (True, bdevs)
2197
2198
2199 def DrbdDisconnectNet(nodes_ip, disks):
2200   """Disconnects the network on a list of drbd devices.
2201
2202   """
2203   status, bdevs = _FindDisks(nodes_ip, disks)
2204   if not status:
2205     return status, bdevs
2206
2207   # disconnect disks
2208   for rd in bdevs:
2209     try:
2210       rd.DisconnectNet()
2211     except errors.BlockDeviceError, err:
2212       logging.exception("Failed to go into standalone mode")
2213       return (False, "Can't change network configuration: %s" % str(err))
2214   return (True, "All disks are now disconnected")
2215
2216
2217 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2218   """Attaches the network on a list of drbd devices.
2219
2220   """
2221   status, bdevs = _FindDisks(nodes_ip, disks)
2222   if not status:
2223     return status, bdevs
2224
2225   if multimaster:
2226     for cf, rd in zip(disks, bdevs):
2227       try:
2228         _SymlinkBlockDev(instance_name, rd.dev_path, cf.iv_name)
2229       except EnvironmentError, err:
2230         return (False, "Can't create symlink: %s" % str(err))
2231   # reconnect disks, switch to new master configuration and if
2232   # needed primary mode
2233   for rd in bdevs:
2234     try:
2235       rd.AttachNet(multimaster)
2236     except errors.BlockDeviceError, err:
2237       return (False, "Can't change network configuration: %s" % str(err))
2238   # wait until the disks are connected; we need to retry the re-attach
2239   # if the device becomes standalone, as this might happen if the one
2240   # node disconnects and reconnects in a different mode before the
2241   # other node reconnects; in this case, one or both of the nodes will
2242   # decide it has wrong configuration and switch to standalone
2243   RECONNECT_TIMEOUT = 2 * 60
2244   sleep_time = 0.100 # start with 100 miliseconds
2245   timeout_limit = time.time() + RECONNECT_TIMEOUT
2246   while time.time() < timeout_limit:
2247     all_connected = True
2248     for rd in bdevs:
2249       stats = rd.GetProcStatus()
2250       if not (stats.is_connected or stats.is_in_resync):
2251         all_connected = False
2252       if stats.is_standalone:
2253         # peer had different config info and this node became
2254         # standalone, even though this should not happen with the
2255         # new staged way of changing disk configs
2256         try:
2257           rd.ReAttachNet(multimaster)
2258         except errors.BlockDeviceError, err:
2259           return (False, "Can't change network configuration: %s" % str(err))
2260     if all_connected:
2261       break
2262     time.sleep(sleep_time)
2263     sleep_time = min(5, sleep_time * 1.5)
2264   if not all_connected:
2265     return (False, "Timeout in disk reconnecting")
2266   if multimaster:
2267     # change to primary mode
2268     for rd in bdevs:
2269       rd.Open()
2270   if multimaster:
2271     msg = "multi-master and primary"
2272   else:
2273     msg = "single-master"
2274   return (True, "Disks are now configured as %s" % msg)
2275
2276
2277 def DrbdWaitSync(nodes_ip, disks):
2278   """Wait until DRBDs have synchronized.
2279
2280   """
2281   status, bdevs = _FindDisks(nodes_ip, disks)
2282   if not status:
2283     return status, bdevs
2284
2285   min_resync = 100
2286   alldone = True
2287   failure = False
2288   for rd in bdevs:
2289     stats = rd.GetProcStatus()
2290     if not (stats.is_connected or stats.is_in_resync):
2291       failure = True
2292       break
2293     alldone = alldone and (not stats.is_in_resync)
2294     if stats.sync_percent is not None:
2295       min_resync = min(min_resync, stats.sync_percent)
2296   return (not failure, (alldone, min_resync))
2297
2298
2299 class HooksRunner(object):
2300   """Hook runner.
2301
2302   This class is instantiated on the node side (ganeti-noded) and not
2303   on the master side.
2304
2305   """
2306   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2307
2308   def __init__(self, hooks_base_dir=None):
2309     """Constructor for hooks runner.
2310
2311     @type hooks_base_dir: str or None
2312     @param hooks_base_dir: if not None, this overrides the
2313         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2314
2315     """
2316     if hooks_base_dir is None:
2317       hooks_base_dir = constants.HOOKS_BASE_DIR
2318     self._BASE_DIR = hooks_base_dir
2319
2320   @staticmethod
2321   def ExecHook(script, env):
2322     """Exec one hook script.
2323
2324     @type script: str
2325     @param script: the full path to the script
2326     @type env: dict
2327     @param env: the environment with which to exec the script
2328     @rtype: tuple (success, message)
2329     @return: a tuple of success and message, where success
2330         indicates the succes of the operation, and message
2331         which will contain the error details in case we
2332         failed
2333
2334     """
2335     # exec the process using subprocess and log the output
2336     fdstdin = None
2337     try:
2338       fdstdin = open("/dev/null", "r")
2339       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2340                                stderr=subprocess.STDOUT, close_fds=True,
2341                                shell=False, cwd="/", env=env)
2342       output = ""
2343       try:
2344         output = child.stdout.read(4096)
2345         child.stdout.close()
2346       except EnvironmentError, err:
2347         output += "Hook script error: %s" % str(err)
2348
2349       while True:
2350         try:
2351           result = child.wait()
2352           break
2353         except EnvironmentError, err:
2354           if err.errno == errno.EINTR:
2355             continue
2356           raise
2357     finally:
2358       # try not to leak fds
2359       for fd in (fdstdin, ):
2360         if fd is not None:
2361           try:
2362             fd.close()
2363           except EnvironmentError, err:
2364             # just log the error
2365             #logging.exception("Error while closing fd %s", fd)
2366             pass
2367
2368     return result == 0, output
2369
2370   def RunHooks(self, hpath, phase, env):
2371     """Run the scripts in the hooks directory.
2372
2373     @type hpath: str
2374     @param hpath: the path to the hooks directory which
2375         holds the scripts
2376     @type phase: str
2377     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2378         L{constants.HOOKS_PHASE_POST}
2379     @type env: dict
2380     @param env: dictionary with the environment for the hook
2381     @rtype: list
2382     @return: list of 3-element tuples:
2383       - script path
2384       - script result, either L{constants.HKR_SUCCESS} or
2385         L{constants.HKR_FAIL}
2386       - output of the script
2387
2388     @raise errors.ProgrammerError: for invalid input
2389         parameters
2390
2391     """
2392     if phase == constants.HOOKS_PHASE_PRE:
2393       suffix = "pre"
2394     elif phase == constants.HOOKS_PHASE_POST:
2395       suffix = "post"
2396     else:
2397       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2398     rr = []
2399
2400     subdir = "%s-%s.d" % (hpath, suffix)
2401     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2402     try:
2403       dir_contents = utils.ListVisibleFiles(dir_name)
2404     except OSError, err:
2405       # FIXME: must log output in case of failures
2406       return rr
2407
2408     # we use the standard python sort order,
2409     # so 00name is the recommended naming scheme
2410     dir_contents.sort()
2411     for relname in dir_contents:
2412       fname = os.path.join(dir_name, relname)
2413       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2414           self.RE_MASK.match(relname) is not None):
2415         rrval = constants.HKR_SKIP
2416         output = ""
2417       else:
2418         result, output = self.ExecHook(fname, env)
2419         if not result:
2420           rrval = constants.HKR_FAIL
2421         else:
2422           rrval = constants.HKR_SUCCESS
2423       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2424
2425     return rr
2426
2427
2428 class IAllocatorRunner(object):
2429   """IAllocator runner.
2430
2431   This class is instantiated on the node side (ganeti-noded) and not on
2432   the master side.
2433
2434   """
2435   def Run(self, name, idata):
2436     """Run an iallocator script.
2437
2438     @type name: str
2439     @param name: the iallocator script name
2440     @type idata: str
2441     @param idata: the allocator input data
2442
2443     @rtype: tuple
2444     @return: four element tuple of:
2445        - run status (one of the IARUN_ constants)
2446        - stdout
2447        - stderr
2448        - fail reason (as from L{utils.RunResult})
2449
2450     """
2451     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2452                                   os.path.isfile)
2453     if alloc_script is None:
2454       return (constants.IARUN_NOTFOUND, None, None, None)
2455
2456     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2457     try:
2458       os.write(fd, idata)
2459       os.close(fd)
2460       result = utils.RunCmd([alloc_script, fin_name])
2461       if result.failed:
2462         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2463                 result.fail_reason)
2464     finally:
2465       os.unlink(fin_name)
2466
2467     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2468
2469
2470 class DevCacheManager(object):
2471   """Simple class for managing a cache of block device information.
2472
2473   """
2474   _DEV_PREFIX = "/dev/"
2475   _ROOT_DIR = constants.BDEV_CACHE_DIR
2476
2477   @classmethod
2478   def _ConvertPath(cls, dev_path):
2479     """Converts a /dev/name path to the cache file name.
2480
2481     This replaces slashes with underscores and strips the /dev
2482     prefix. It then returns the full path to the cache file.
2483
2484     @type dev_path: str
2485     @param dev_path: the C{/dev/} path name
2486     @rtype: str
2487     @return: the converted path name
2488
2489     """
2490     if dev_path.startswith(cls._DEV_PREFIX):
2491       dev_path = dev_path[len(cls._DEV_PREFIX):]
2492     dev_path = dev_path.replace("/", "_")
2493     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2494     return fpath
2495
2496   @classmethod
2497   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2498     """Updates the cache information for a given device.
2499
2500     @type dev_path: str
2501     @param dev_path: the pathname of the device
2502     @type owner: str
2503     @param owner: the owner (instance name) of the device
2504     @type on_primary: bool
2505     @param on_primary: whether this is the primary
2506         node nor not
2507     @type iv_name: str
2508     @param iv_name: the instance-visible name of the
2509         device, as in objects.Disk.iv_name
2510
2511     @rtype: None
2512
2513     """
2514     if dev_path is None:
2515       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2516       return
2517     fpath = cls._ConvertPath(dev_path)
2518     if on_primary:
2519       state = "primary"
2520     else:
2521       state = "secondary"
2522     if iv_name is None:
2523       iv_name = "not_visible"
2524     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2525     try:
2526       utils.WriteFile(fpath, data=fdata)
2527     except EnvironmentError, err:
2528       logging.exception("Can't update bdev cache for %s", dev_path)
2529
2530   @classmethod
2531   def RemoveCache(cls, dev_path):
2532     """Remove data for a dev_path.
2533
2534     This is just a wrapper over L{utils.RemoveFile} with a converted
2535     path name and logging.
2536
2537     @type dev_path: str
2538     @param dev_path: the pathname of the device
2539
2540     @rtype: None
2541
2542     """
2543     if dev_path is None:
2544       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2545       return
2546     fpath = cls._ConvertPath(dev_path)
2547     try:
2548       utils.RemoveFile(fpath)
2549     except EnvironmentError, err:
2550       logging.exception("Can't update bdev cache for %s", dev_path)