Convert the start instance rpc to (status, data)
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     logging.exception("Error while processing user ssh files")
264     return False
265
266   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267     utils.WriteFile(name, data=content, mode=0600)
268
269   utils.AddAuthorizedKey(auth_keys, sshpub)
270
271   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272
273   return True
274
275
276 def LeaveCluster():
277   """Cleans up and remove the current node.
278
279   This function cleans up and prepares the current node to be removed
280   from the cluster.
281
282   If processing is successful, then it raises an
283   L{errors.QuitGanetiException} which is used as a special case to
284   shutdown the node daemon.
285
286   """
287   _CleanDirectory(constants.DATA_DIR)
288   JobQueuePurge()
289
290   try:
291     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292   except errors.OpExecError:
293     logging.exception("Error while processing ssh files")
294     return
295
296   f = open(pub_key, 'r')
297   try:
298     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299   finally:
300     f.close()
301
302   utils.RemoveFile(priv_key)
303   utils.RemoveFile(pub_key)
304
305   # Return a reassuring string to the caller, and quit
306   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307
308
309 def GetNodeInfo(vgname, hypervisor_type):
310   """Gives back a hash with different informations about the node.
311
312   @type vgname: C{string}
313   @param vgname: the name of the volume group to ask for disk space information
314   @type hypervisor_type: C{str}
315   @param hypervisor_type: the name of the hypervisor to ask for
316       memory information
317   @rtype: C{dict}
318   @return: dictionary with the following keys:
319       - vg_size is the size of the configured volume group in MiB
320       - vg_free is the free size of the volume group in MiB
321       - memory_dom0 is the memory allocated for domain0 in MiB
322       - memory_free is the currently available (free) ram in MiB
323       - memory_total is the total number of ram in MiB
324
325   """
326   outputarray = {}
327   vginfo = _GetVGInfo(vgname)
328   outputarray['vg_size'] = vginfo['vg_size']
329   outputarray['vg_free'] = vginfo['vg_free']
330
331   hyper = hypervisor.GetHypervisor(hypervisor_type)
332   hyp_info = hyper.GetNodeInfo()
333   if hyp_info is not None:
334     outputarray.update(hyp_info)
335
336   f = open("/proc/sys/kernel/random/boot_id", 'r')
337   try:
338     outputarray["bootid"] = f.read(128).rstrip("\n")
339   finally:
340     f.close()
341
342   return outputarray
343
344
345 def VerifyNode(what, cluster_name):
346   """Verify the status of the local node.
347
348   Based on the input L{what} parameter, various checks are done on the
349   local node.
350
351   If the I{filelist} key is present, this list of
352   files is checksummed and the file/checksum pairs are returned.
353
354   If the I{nodelist} key is present, we check that we have
355   connectivity via ssh with the target nodes (and check the hostname
356   report).
357
358   If the I{node-net-test} key is present, we check that we have
359   connectivity to the given nodes via both primary IP and, if
360   applicable, secondary IPs.
361
362   @type what: C{dict}
363   @param what: a dictionary of things to check:
364       - filelist: list of files for which to compute checksums
365       - nodelist: list of nodes we should check ssh communication with
366       - node-net-test: list of nodes we should check node daemon port
367         connectivity with
368       - hypervisor: list with hypervisors to run the verify for
369   @rtype: dict
370   @return: a dictionary with the same keys as the input dict, and
371       values representing the result of the checks
372
373   """
374   result = {}
375
376   if constants.NV_HYPERVISOR in what:
377     result[constants.NV_HYPERVISOR] = tmp = {}
378     for hv_name in what[constants.NV_HYPERVISOR]:
379       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380
381   if constants.NV_FILELIST in what:
382     result[constants.NV_FILELIST] = utils.FingerprintFiles(
383       what[constants.NV_FILELIST])
384
385   if constants.NV_NODELIST in what:
386     result[constants.NV_NODELIST] = tmp = {}
387     random.shuffle(what[constants.NV_NODELIST])
388     for node in what[constants.NV_NODELIST]:
389       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390       if not success:
391         tmp[node] = message
392
393   if constants.NV_NODENETTEST in what:
394     result[constants.NV_NODENETTEST] = tmp = {}
395     my_name = utils.HostInfo().name
396     my_pip = my_sip = None
397     for name, pip, sip in what[constants.NV_NODENETTEST]:
398       if name == my_name:
399         my_pip = pip
400         my_sip = sip
401         break
402     if not my_pip:
403       tmp[my_name] = ("Can't find my own primary/secondary IP"
404                       " in the node list")
405     else:
406       port = utils.GetNodeDaemonPort()
407       for name, pip, sip in what[constants.NV_NODENETTEST]:
408         fail = []
409         if not utils.TcpPing(pip, port, source=my_pip):
410           fail.append("primary")
411         if sip != pip:
412           if not utils.TcpPing(sip, port, source=my_sip):
413             fail.append("secondary")
414         if fail:
415           tmp[name] = ("failure using the %s interface(s)" %
416                        " and ".join(fail))
417
418   if constants.NV_LVLIST in what:
419     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420
421   if constants.NV_INSTANCELIST in what:
422     result[constants.NV_INSTANCELIST] = GetInstanceList(
423       what[constants.NV_INSTANCELIST])
424
425   if constants.NV_VGLIST in what:
426     result[constants.NV_VGLIST] = ListVolumeGroups()
427
428   if constants.NV_VERSION in what:
429     result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430
431   if constants.NV_HVINFO in what:
432     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434
435   return result
436
437
438 def GetVolumeList(vg_name):
439   """Compute list of logical volumes and their size.
440
441   @type vg_name: str
442   @param vg_name: the volume group whose LVs we should list
443   @rtype: dict
444   @return:
445       dictionary of all partions (key) with value being a tuple of
446       their size (in MiB), inactive and online status::
447
448         {'test1': ('20.06', True, True)}
449
450       in case of errors, a string is returned with the error
451       details.
452
453   """
454   lvs = {}
455   sep = '|'
456   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
457                          "--separator=%s" % sep,
458                          "-olv_name,lv_size,lv_attr", vg_name])
459   if result.failed:
460     logging.error("Failed to list logical volumes, lvs output: %s",
461                   result.output)
462     return result.output
463
464   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
465   for line in result.stdout.splitlines():
466     line = line.strip()
467     match = valid_line_re.match(line)
468     if not match:
469       logging.error("Invalid line returned from lvs output: '%s'", line)
470       continue
471     name, size, attr = match.groups()
472     inactive = attr[4] == '-'
473     online = attr[5] == 'o'
474     lvs[name] = (size, inactive, online)
475
476   return lvs
477
478
479 def ListVolumeGroups():
480   """List the volume groups and their size.
481
482   @rtype: dict
483   @return: dictionary with keys volume name and values the
484       size of the volume
485
486   """
487   return utils.ListVolumeGroups()
488
489
490 def NodeVolumes():
491   """List all volumes on this node.
492
493   @rtype: list
494   @return:
495     A list of dictionaries, each having four keys:
496       - name: the logical volume name,
497       - size: the size of the logical volume
498       - dev: the physical device on which the LV lives
499       - vg: the volume group to which it belongs
500
501     In case of errors, we return an empty list and log the
502     error.
503
504     Note that since a logical volume can live on multiple physical
505     volumes, the resulting list might include a logical volume
506     multiple times.
507
508   """
509   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
510                          "--separator=|",
511                          "--options=lv_name,lv_size,devices,vg_name"])
512   if result.failed:
513     logging.error("Failed to list logical volumes, lvs output: %s",
514                   result.output)
515     return []
516
517   def parse_dev(dev):
518     if '(' in dev:
519       return dev.split('(')[0]
520     else:
521       return dev
522
523   def map_line(line):
524     return {
525       'name': line[0].strip(),
526       'size': line[1].strip(),
527       'dev': parse_dev(line[2].strip()),
528       'vg': line[3].strip(),
529     }
530
531   return [map_line(line.split('|')) for line in result.stdout.splitlines()
532           if line.count('|') >= 3]
533
534
535 def BridgesExist(bridges_list):
536   """Check if a list of bridges exist on the current node.
537
538   @rtype: boolean
539   @return: C{True} if all of them exist, C{False} otherwise
540
541   """
542   for bridge in bridges_list:
543     if not utils.BridgeExists(bridge):
544       return False
545
546   return True
547
548
549 def GetInstanceList(hypervisor_list):
550   """Provides a list of instances.
551
552   @type hypervisor_list: list
553   @param hypervisor_list: the list of hypervisors to query information
554
555   @rtype: list
556   @return: a list of all running instances on the current node
557     - instance1.example.com
558     - instance2.example.com
559
560   """
561   results = []
562   for hname in hypervisor_list:
563     try:
564       names = hypervisor.GetHypervisor(hname).ListInstances()
565       results.extend(names)
566     except errors.HypervisorError, err:
567       logging.exception("Error enumerating instances for hypevisor %s", hname)
568       # FIXME: should we somehow not propagate this to the master?
569       raise
570
571   return results
572
573
574 def GetInstanceInfo(instance, hname):
575   """Gives back the informations about an instance as a dictionary.
576
577   @type instance: string
578   @param instance: the instance name
579   @type hname: string
580   @param hname: the hypervisor type of the instance
581
582   @rtype: dict
583   @return: dictionary with the following keys:
584       - memory: memory size of instance (int)
585       - state: xen state of instance (string)
586       - time: cpu time of instance (float)
587
588   """
589   output = {}
590
591   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
592   if iinfo is not None:
593     output['memory'] = iinfo[2]
594     output['state'] = iinfo[4]
595     output['time'] = iinfo[5]
596
597   return output
598
599
600 def GetInstanceMigratable(instance):
601   """Gives whether an instance can be migrated.
602
603   @type instance: L{objects.Instance}
604   @param instance: object representing the instance to be checked.
605
606   @rtype: tuple
607   @return: tuple of (result, description) where:
608       - result: whether the instance can be migrated or not
609       - description: a description of the issue, if relevant
610
611   """
612   hyper = hypervisor.GetHypervisor(instance.hypervisor)
613   if instance.name not in hyper.ListInstances():
614     return (False, 'not running')
615
616   for idx in range(len(instance.disks)):
617     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
618     if not os.path.islink(link_name):
619       return (False, 'not restarted since ganeti 1.2.5')
620
621   return (True, '')
622
623
624 def GetAllInstancesInfo(hypervisor_list):
625   """Gather data about all instances.
626
627   This is the equivalent of L{GetInstanceInfo}, except that it
628   computes data for all instances at once, thus being faster if one
629   needs data about more than one instance.
630
631   @type hypervisor_list: list
632   @param hypervisor_list: list of hypervisors to query for instance data
633
634   @rtype: dict
635   @return: dictionary of instance: data, with data having the following keys:
636       - memory: memory size of instance (int)
637       - state: xen state of instance (string)
638       - time: cpu time of instance (float)
639       - vcpus: the number of vcpus
640
641   """
642   output = {}
643
644   for hname in hypervisor_list:
645     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
646     if iinfo:
647       for name, inst_id, memory, vcpus, state, times in iinfo:
648         value = {
649           'memory': memory,
650           'vcpus': vcpus,
651           'state': state,
652           'time': times,
653           }
654         if name in output and output[name] != value:
655           raise errors.HypervisorError("Instance %s running duplicate"
656                                        " with different parameters" % name)
657         output[name] = value
658
659   return output
660
661
662 def AddOSToInstance(instance):
663   """Add an OS to an instance.
664
665   @type instance: L{objects.Instance}
666   @param instance: Instance whose OS is to be installed
667   @rtype: boolean
668   @return: the success of the operation
669
670   """
671   inst_os = OSFromDisk(instance.os)
672
673   create_env = OSEnvironment(instance)
674
675   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
676                                      instance.name, int(time.time()))
677
678   result = utils.RunCmd([inst_os.create_script], env=create_env,
679                         cwd=inst_os.path, output=logfile,)
680   if result.failed:
681     logging.error("os create command '%s' returned error: %s, logfile: %s,"
682                   " output: %s", result.cmd, result.fail_reason, logfile,
683                   result.output)
684     return False
685
686   return True
687
688
689 def RunRenameInstance(instance, old_name):
690   """Run the OS rename script for an instance.
691
692   @type instance: L{objects.Instance}
693   @param instance: Instance whose OS is to be installed
694   @type old_name: string
695   @param old_name: previous instance name
696   @rtype: boolean
697   @return: the success of the operation
698
699   """
700   inst_os = OSFromDisk(instance.os)
701
702   rename_env = OSEnvironment(instance)
703   rename_env['OLD_INSTANCE_NAME'] = old_name
704
705   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
706                                            old_name,
707                                            instance.name, int(time.time()))
708
709   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
710                         cwd=inst_os.path, output=logfile)
711
712   if result.failed:
713     logging.error("os create command '%s' returned error: %s output: %s",
714                   result.cmd, result.fail_reason, result.output)
715     return False
716
717   return True
718
719
720 def _GetVGInfo(vg_name):
721   """Get informations about the volume group.
722
723   @type vg_name: str
724   @param vg_name: the volume group which we query
725   @rtype: dict
726   @return:
727     A dictionary with the following keys:
728       - C{vg_size} is the total size of the volume group in MiB
729       - C{vg_free} is the free size of the volume group in MiB
730       - C{pv_count} are the number of physical disks in that VG
731
732     If an error occurs during gathering of data, we return the same dict
733     with keys all set to None.
734
735   """
736   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
737
738   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
739                          "--nosuffix", "--units=m", "--separator=:", vg_name])
740
741   if retval.failed:
742     logging.error("volume group %s not present", vg_name)
743     return retdic
744   valarr = retval.stdout.strip().rstrip(':').split(':')
745   if len(valarr) == 3:
746     try:
747       retdic = {
748         "vg_size": int(round(float(valarr[0]), 0)),
749         "vg_free": int(round(float(valarr[1]), 0)),
750         "pv_count": int(valarr[2]),
751         }
752     except ValueError, err:
753       logging.exception("Fail to parse vgs output")
754   else:
755     logging.error("vgs output has the wrong number of fields (expected"
756                   " three): %s", str(valarr))
757   return retdic
758
759
760 def _GetBlockDevSymlinkPath(instance_name, idx):
761   return os.path.join(constants.DISK_LINKS_DIR,
762                       "%s:%d" % (instance_name, idx))
763
764
765 def _SymlinkBlockDev(instance_name, device_path, idx):
766   """Set up symlinks to a instance's block device.
767
768   This is an auxiliary function run when an instance is start (on the primary
769   node) or when an instance is migrated (on the target node).
770
771
772   @param instance_name: the name of the target instance
773   @param device_path: path of the physical block device, on the node
774   @param idx: the disk index
775   @return: absolute path to the disk's symlink
776
777   """
778   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
779   try:
780     os.symlink(device_path, link_name)
781   except OSError, err:
782     if err.errno == errno.EEXIST:
783       if (not os.path.islink(link_name) or
784           os.readlink(link_name) != device_path):
785         os.remove(link_name)
786         os.symlink(device_path, link_name)
787     else:
788       raise
789
790   return link_name
791
792
793 def _RemoveBlockDevLinks(instance_name, disks):
794   """Remove the block device symlinks belonging to the given instance.
795
796   """
797   for idx, disk in enumerate(disks):
798     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
799     if os.path.islink(link_name):
800       try:
801         os.remove(link_name)
802       except OSError:
803         logging.exception("Can't remove symlink '%s'", link_name)
804
805
806 def _GatherAndLinkBlockDevs(instance):
807   """Set up an instance's block device(s).
808
809   This is run on the primary node at instance startup. The block
810   devices must be already assembled.
811
812   @type instance: L{objects.Instance}
813   @param instance: the instance whose disks we shoul assemble
814   @rtype: list
815   @return: list of (disk_object, device_path)
816
817   """
818   block_devices = []
819   for idx, disk in enumerate(instance.disks):
820     device = _RecursiveFindBD(disk)
821     if device is None:
822       raise errors.BlockDeviceError("Block device '%s' is not set up." %
823                                     str(disk))
824     device.Open()
825     try:
826       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
827     except OSError, e:
828       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
829                                     e.strerror)
830
831     block_devices.append((disk, link_name))
832
833   return block_devices
834
835
836 def StartInstance(instance, extra_args):
837   """Start an instance.
838
839   @type instance: L{objects.Instance}
840   @param instance: the instance object
841   @rtype: boolean
842   @return: whether the startup was successful or not
843
844   """
845   running_instances = GetInstanceList([instance.hypervisor])
846
847   if instance.name in running_instances:
848     return (True, "Already running")
849
850   try:
851     block_devices = _GatherAndLinkBlockDevs(instance)
852     hyper = hypervisor.GetHypervisor(instance.hypervisor)
853     hyper.StartInstance(instance, block_devices, extra_args)
854   except errors.BlockDeviceError, err:
855     logging.exception("Failed to start instance")
856     return (False, "Block device error: %s" % str(err))
857   except errors.HypervisorError, err:
858     logging.exception("Failed to start instance")
859     _RemoveBlockDevLinks(instance.name, instance.disks)
860     return (False, "Hypervisor error: %s" % str(err))
861
862   return (True, "Instance started successfully")
863
864
865 def ShutdownInstance(instance):
866   """Shut an instance down.
867
868   @note: this functions uses polling with a hardcoded timeout.
869
870   @type instance: L{objects.Instance}
871   @param instance: the instance object
872   @rtype: boolean
873   @return: whether the startup was successful or not
874
875   """
876   hv_name = instance.hypervisor
877   running_instances = GetInstanceList([hv_name])
878
879   if instance.name not in running_instances:
880     return True
881
882   hyper = hypervisor.GetHypervisor(hv_name)
883   try:
884     hyper.StopInstance(instance)
885   except errors.HypervisorError, err:
886     logging.error("Failed to stop instance")
887     return False
888
889   # test every 10secs for 2min
890
891   time.sleep(1)
892   for dummy in range(11):
893     if instance.name not in GetInstanceList([hv_name]):
894       break
895     time.sleep(10)
896   else:
897     # the shutdown did not succeed
898     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
899
900     try:
901       hyper.StopInstance(instance, force=True)
902     except errors.HypervisorError, err:
903       logging.exception("Failed to stop instance")
904       return False
905
906     time.sleep(1)
907     if instance.name in GetInstanceList([hv_name]):
908       logging.error("could not shutdown instance '%s' even by destroy",
909                     instance.name)
910       return False
911
912   _RemoveBlockDevLinks(instance.name, instance.disks)
913
914   return True
915
916
917 def RebootInstance(instance, reboot_type, extra_args):
918   """Reboot an instance.
919
920   @type instance: L{objects.Instance}
921   @param instance: the instance object to reboot
922   @type reboot_type: str
923   @param reboot_type: the type of reboot, one the following
924     constants:
925       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
926         instance OS, do not recreate the VM
927       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
928         restart the VM (at the hypervisor level)
929       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
930         is not accepted here, since that mode is handled
931         differently
932   @rtype: boolean
933   @return: the success of the operation
934
935   """
936   running_instances = GetInstanceList([instance.hypervisor])
937
938   if instance.name not in running_instances:
939     logging.error("Cannot reboot instance that is not running")
940     return False
941
942   hyper = hypervisor.GetHypervisor(instance.hypervisor)
943   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
944     try:
945       hyper.RebootInstance(instance)
946     except errors.HypervisorError, err:
947       logging.exception("Failed to soft reboot instance")
948       return False
949   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
950     try:
951       ShutdownInstance(instance)
952       StartInstance(instance, extra_args)
953     except errors.HypervisorError, err:
954       logging.exception("Failed to hard reboot instance")
955       return False
956   else:
957     raise errors.ParameterError("reboot_type invalid")
958
959   return True
960
961
962 def MigrateInstance(instance, target, live):
963   """Migrates an instance to another node.
964
965   @type instance: L{objects.Instance}
966   @param instance: the instance definition
967   @type target: string
968   @param target: the target node name
969   @type live: boolean
970   @param live: whether the migration should be done live or not (the
971       interpretation of this parameter is left to the hypervisor)
972   @rtype: tuple
973   @return: a tuple of (success, msg) where:
974       - succes is a boolean denoting the success/failure of the operation
975       - msg is a string with details in case of failure
976
977   """
978   hyper = hypervisor.GetHypervisor(instance.hypervisor)
979
980   try:
981     hyper.MigrateInstance(instance.name, target, live)
982   except errors.HypervisorError, err:
983     msg = "Failed to migrate instance"
984     logging.exception(msg)
985     return (False, "%s: %s" % (msg, err))
986   return (True, "Migration successfull")
987
988
989 def CreateBlockDevice(disk, size, owner, on_primary, info):
990   """Creates a block device for an instance.
991
992   @type disk: L{objects.Disk}
993   @param disk: the object describing the disk we should create
994   @type size: int
995   @param size: the size of the physical underlying device, in MiB
996   @type owner: str
997   @param owner: the name of the instance for which disk is created,
998       used for device cache data
999   @type on_primary: boolean
1000   @param on_primary:  indicates if it is the primary node or not
1001   @type info: string
1002   @param info: string that will be sent to the physical device
1003       creation, used for example to set (LVM) tags on LVs
1004
1005   @return: the new unique_id of the device (this can sometime be
1006       computed only after creation), or None. On secondary nodes,
1007       it's not required to return anything.
1008
1009   """
1010   clist = []
1011   if disk.children:
1012     for child in disk.children:
1013       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1014       if on_primary or disk.AssembleOnSecondary():
1015         # we need the children open in case the device itself has to
1016         # be assembled
1017         crdev.Open()
1018       clist.append(crdev)
1019
1020   try:
1021     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1022   except errors.GenericError, err:
1023     return False, "Can't create block device: %s" % str(err)
1024
1025   if on_primary or disk.AssembleOnSecondary():
1026     if not device.Assemble():
1027       errorstring = "Can't assemble device after creation, very unusual event"
1028       logging.error(errorstring)
1029       return False, errorstring
1030     device.SetSyncSpeed(constants.SYNC_SPEED)
1031     if on_primary or disk.OpenOnSecondary():
1032       device.Open(force=True)
1033     DevCacheManager.UpdateCache(device.dev_path, owner,
1034                                 on_primary, disk.iv_name)
1035
1036   device.SetInfo(info)
1037
1038   physical_id = device.unique_id
1039   return True, physical_id
1040
1041
1042 def RemoveBlockDevice(disk):
1043   """Remove a block device.
1044
1045   @note: This is intended to be called recursively.
1046
1047   @type disk: L{objects.Disk}
1048   @param disk: the disk object we should remove
1049   @rtype: boolean
1050   @return: the success of the operation
1051
1052   """
1053   try:
1054     rdev = _RecursiveFindBD(disk)
1055   except errors.BlockDeviceError, err:
1056     # probably can't attach
1057     logging.info("Can't attach to device %s in remove", disk)
1058     rdev = None
1059   if rdev is not None:
1060     r_path = rdev.dev_path
1061     result = rdev.Remove()
1062     if result:
1063       DevCacheManager.RemoveCache(r_path)
1064   else:
1065     result = True
1066   if disk.children:
1067     for child in disk.children:
1068       result = result and RemoveBlockDevice(child)
1069   return result
1070
1071
1072 def _RecursiveAssembleBD(disk, owner, as_primary):
1073   """Activate a block device for an instance.
1074
1075   This is run on the primary and secondary nodes for an instance.
1076
1077   @note: this function is called recursively.
1078
1079   @type disk: L{objects.Disk}
1080   @param disk: the disk we try to assemble
1081   @type owner: str
1082   @param owner: the name of the instance which owns the disk
1083   @type as_primary: boolean
1084   @param as_primary: if we should make the block device
1085       read/write
1086
1087   @return: the assembled device or None (in case no device
1088       was assembled)
1089   @raise errors.BlockDeviceError: in case there is an error
1090       during the activation of the children or the device
1091       itself
1092
1093   """
1094   children = []
1095   if disk.children:
1096     mcn = disk.ChildrenNeeded()
1097     if mcn == -1:
1098       mcn = 0 # max number of Nones allowed
1099     else:
1100       mcn = len(disk.children) - mcn # max number of Nones
1101     for chld_disk in disk.children:
1102       try:
1103         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1104       except errors.BlockDeviceError, err:
1105         if children.count(None) >= mcn:
1106           raise
1107         cdev = None
1108         logging.debug("Error in child activation: %s", str(err))
1109       children.append(cdev)
1110
1111   if as_primary or disk.AssembleOnSecondary():
1112     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1113     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1114     result = r_dev
1115     if as_primary or disk.OpenOnSecondary():
1116       r_dev.Open()
1117     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1118                                 as_primary, disk.iv_name)
1119
1120   else:
1121     result = True
1122   return result
1123
1124
1125 def AssembleBlockDevice(disk, owner, as_primary):
1126   """Activate a block device for an instance.
1127
1128   This is a wrapper over _RecursiveAssembleBD.
1129
1130   @rtype: str or boolean
1131   @return: a C{/dev/...} path for primary nodes, and
1132       C{True} for secondary nodes
1133
1134   """
1135   result = _RecursiveAssembleBD(disk, owner, as_primary)
1136   if isinstance(result, bdev.BlockDev):
1137     result = result.dev_path
1138   return result
1139
1140
1141 def ShutdownBlockDevice(disk):
1142   """Shut down a block device.
1143
1144   First, if the device is assembled (Attach() is successfull), then
1145   the device is shutdown. Then the children of the device are
1146   shutdown.
1147
1148   This function is called recursively. Note that we don't cache the
1149   children or such, as oppossed to assemble, shutdown of different
1150   devices doesn't require that the upper device was active.
1151
1152   @type disk: L{objects.Disk}
1153   @param disk: the description of the disk we should
1154       shutdown
1155   @rtype: boolean
1156   @return: the success of the operation
1157
1158   """
1159   r_dev = _RecursiveFindBD(disk)
1160   if r_dev is not None:
1161     r_path = r_dev.dev_path
1162     result = r_dev.Shutdown()
1163     if result:
1164       DevCacheManager.RemoveCache(r_path)
1165   else:
1166     result = True
1167   if disk.children:
1168     for child in disk.children:
1169       result = result and ShutdownBlockDevice(child)
1170   return result
1171
1172
1173 def MirrorAddChildren(parent_cdev, new_cdevs):
1174   """Extend a mirrored block device.
1175
1176   @type parent_cdev: L{objects.Disk}
1177   @param parent_cdev: the disk to which we should add children
1178   @type new_cdevs: list of L{objects.Disk}
1179   @param new_cdevs: the list of children which we should add
1180   @rtype: boolean
1181   @return: the success of the operation
1182
1183   """
1184   parent_bdev = _RecursiveFindBD(parent_cdev)
1185   if parent_bdev is None:
1186     logging.error("Can't find parent device")
1187     return False
1188   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1189   if new_bdevs.count(None) > 0:
1190     logging.error("Can't find new device(s) to add: %s:%s",
1191                   new_bdevs, new_cdevs)
1192     return False
1193   parent_bdev.AddChildren(new_bdevs)
1194   return True
1195
1196
1197 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1198   """Shrink a mirrored block device.
1199
1200   @type parent_cdev: L{objects.Disk}
1201   @param parent_cdev: the disk from which we should remove children
1202   @type new_cdevs: list of L{objects.Disk}
1203   @param new_cdevs: the list of children which we should remove
1204   @rtype: boolean
1205   @return: the success of the operation
1206
1207   """
1208   parent_bdev = _RecursiveFindBD(parent_cdev)
1209   if parent_bdev is None:
1210     logging.error("Can't find parent in remove children: %s", parent_cdev)
1211     return False
1212   devs = []
1213   for disk in new_cdevs:
1214     rpath = disk.StaticDevPath()
1215     if rpath is None:
1216       bd = _RecursiveFindBD(disk)
1217       if bd is None:
1218         logging.error("Can't find dynamic device %s while removing children",
1219                       disk)
1220         return False
1221       else:
1222         devs.append(bd.dev_path)
1223     else:
1224       devs.append(rpath)
1225   parent_bdev.RemoveChildren(devs)
1226   return True
1227
1228
1229 def GetMirrorStatus(disks):
1230   """Get the mirroring status of a list of devices.
1231
1232   @type disks: list of L{objects.Disk}
1233   @param disks: the list of disks which we should query
1234   @rtype: disk
1235   @return:
1236       a list of (mirror_done, estimated_time) tuples, which
1237       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1238   @raise errors.BlockDeviceError: if any of the disks cannot be
1239       found
1240
1241   """
1242   stats = []
1243   for dsk in disks:
1244     rbd = _RecursiveFindBD(dsk)
1245     if rbd is None:
1246       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1247     stats.append(rbd.CombinedSyncStatus())
1248   return stats
1249
1250
1251 def _RecursiveFindBD(disk):
1252   """Check if a device is activated.
1253
1254   If so, return informations about the real device.
1255
1256   @type disk: L{objects.Disk}
1257   @param disk: the disk object we need to find
1258
1259   @return: None if the device can't be found,
1260       otherwise the device instance
1261
1262   """
1263   children = []
1264   if disk.children:
1265     for chdisk in disk.children:
1266       children.append(_RecursiveFindBD(chdisk))
1267
1268   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1269
1270
1271 def FindBlockDevice(disk):
1272   """Check if a device is activated.
1273
1274   If it is, return informations about the real device.
1275
1276   @type disk: L{objects.Disk}
1277   @param disk: the disk to find
1278   @rtype: None or tuple
1279   @return: None if the disk cannot be found, otherwise a
1280       tuple (device_path, major, minor, sync_percent,
1281       estimated_time, is_degraded)
1282
1283   """
1284   rbd = _RecursiveFindBD(disk)
1285   if rbd is None:
1286     return rbd
1287   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1288
1289
1290 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1291   """Write a file to the filesystem.
1292
1293   This allows the master to overwrite(!) a file. It will only perform
1294   the operation if the file belongs to a list of configuration files.
1295
1296   @type file_name: str
1297   @param file_name: the target file name
1298   @type data: str
1299   @param data: the new contents of the file
1300   @type mode: int
1301   @param mode: the mode to give the file (can be None)
1302   @type uid: int
1303   @param uid: the owner of the file (can be -1 for default)
1304   @type gid: int
1305   @param gid: the group of the file (can be -1 for default)
1306   @type atime: float
1307   @param atime: the atime to set on the file (can be None)
1308   @type mtime: float
1309   @param mtime: the mtime to set on the file (can be None)
1310   @rtype: boolean
1311   @return: the success of the operation; errors are logged
1312       in the node daemon log
1313
1314   """
1315   if not os.path.isabs(file_name):
1316     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1317                   file_name)
1318     return False
1319
1320   allowed_files = [
1321     constants.CLUSTER_CONF_FILE,
1322     constants.ETC_HOSTS,
1323     constants.SSH_KNOWN_HOSTS_FILE,
1324     constants.VNC_PASSWORD_FILE,
1325     ]
1326
1327   if file_name not in allowed_files:
1328     logging.error("Filename passed to UploadFile not in allowed"
1329                  " upload targets: '%s'", file_name)
1330     return False
1331
1332   raw_data = _Decompress(data)
1333
1334   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1335                   atime=atime, mtime=mtime)
1336   return True
1337
1338
1339 def WriteSsconfFiles(values):
1340   """Update all ssconf files.
1341
1342   Wrapper around the SimpleStore.WriteFiles.
1343
1344   """
1345   ssconf.SimpleStore().WriteFiles(values)
1346
1347
1348 def _ErrnoOrStr(err):
1349   """Format an EnvironmentError exception.
1350
1351   If the L{err} argument has an errno attribute, it will be looked up
1352   and converted into a textual C{E...} description. Otherwise the
1353   string representation of the error will be returned.
1354
1355   @type err: L{EnvironmentError}
1356   @param err: the exception to format
1357
1358   """
1359   if hasattr(err, 'errno'):
1360     detail = errno.errorcode[err.errno]
1361   else:
1362     detail = str(err)
1363   return detail
1364
1365
1366 def _OSOndiskVersion(name, os_dir):
1367   """Compute and return the API version of a given OS.
1368
1369   This function will try to read the API version of the OS given by
1370   the 'name' parameter and residing in the 'os_dir' directory.
1371
1372   @type name: str
1373   @param name: the OS name we should look for
1374   @type os_dir: str
1375   @param os_dir: the directory inwhich we should look for the OS
1376   @rtype: int or None
1377   @return:
1378       Either an integer denoting the version or None in the
1379       case when this is not a valid OS name.
1380   @raise errors.InvalidOS: if the OS cannot be found
1381
1382   """
1383   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1384
1385   try:
1386     st = os.stat(api_file)
1387   except EnvironmentError, err:
1388     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1389                            " found (%s)" % _ErrnoOrStr(err))
1390
1391   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1392     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1393                            " a regular file")
1394
1395   try:
1396     f = open(api_file)
1397     try:
1398       api_versions = f.readlines()
1399     finally:
1400       f.close()
1401   except EnvironmentError, err:
1402     raise errors.InvalidOS(name, os_dir, "error while reading the"
1403                            " API version (%s)" % _ErrnoOrStr(err))
1404
1405   api_versions = [version.strip() for version in api_versions]
1406   try:
1407     api_versions = [int(version) for version in api_versions]
1408   except (TypeError, ValueError), err:
1409     raise errors.InvalidOS(name, os_dir,
1410                            "API version is not integer (%s)" % str(err))
1411
1412   return api_versions
1413
1414
1415 def DiagnoseOS(top_dirs=None):
1416   """Compute the validity for all OSes.
1417
1418   @type top_dirs: list
1419   @param top_dirs: the list of directories in which to
1420       search (if not given defaults to
1421       L{constants.OS_SEARCH_PATH})
1422   @rtype: list of L{objects.OS}
1423   @return: an OS object for each name in all the given
1424       directories
1425
1426   """
1427   if top_dirs is None:
1428     top_dirs = constants.OS_SEARCH_PATH
1429
1430   result = []
1431   for dir_name in top_dirs:
1432     if os.path.isdir(dir_name):
1433       try:
1434         f_names = utils.ListVisibleFiles(dir_name)
1435       except EnvironmentError, err:
1436         logging.exception("Can't list the OS directory %s", dir_name)
1437         break
1438       for name in f_names:
1439         try:
1440           os_inst = OSFromDisk(name, base_dir=dir_name)
1441           result.append(os_inst)
1442         except errors.InvalidOS, err:
1443           result.append(objects.OS.FromInvalidOS(err))
1444
1445   return result
1446
1447
1448 def OSFromDisk(name, base_dir=None):
1449   """Create an OS instance from disk.
1450
1451   This function will return an OS instance if the given name is a
1452   valid OS name. Otherwise, it will raise an appropriate
1453   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1454
1455   @type base_dir: string
1456   @keyword base_dir: Base directory containing OS installations.
1457                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1458   @rtype: L{objects.OS}
1459   @return: the OS instance if we find a valid one
1460   @raise errors.InvalidOS: if we don't find a valid OS
1461
1462   """
1463   if base_dir is None:
1464     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1465     if os_dir is None:
1466       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1467   else:
1468     os_dir = os.path.sep.join([base_dir, name])
1469
1470   api_versions = _OSOndiskVersion(name, os_dir)
1471
1472   if constants.OS_API_VERSION not in api_versions:
1473     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1474                            " (found %s want %s)"
1475                            % (api_versions, constants.OS_API_VERSION))
1476
1477   # OS Scripts dictionary, we will populate it with the actual script names
1478   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1479
1480   for script in os_scripts:
1481     os_scripts[script] = os.path.sep.join([os_dir, script])
1482
1483     try:
1484       st = os.stat(os_scripts[script])
1485     except EnvironmentError, err:
1486       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1487                              (script, _ErrnoOrStr(err)))
1488
1489     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1490       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1491                              script)
1492
1493     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1494       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1495                              script)
1496
1497
1498   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1499                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1500                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1501                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1502                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1503                     api_versions=api_versions)
1504
1505 def OSEnvironment(instance, debug=0):
1506   """Calculate the environment for an os script.
1507
1508   @type instance: L{objects.Instance}
1509   @param instance: target instance for the os script run
1510   @type debug: integer
1511   @param debug: debug level (0 or 1, for OS Api 10)
1512   @rtype: dict
1513   @return: dict of environment variables
1514   @raise errors.BlockDeviceError: if the block device
1515       cannot be found
1516
1517   """
1518   result = {}
1519   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1520   result['INSTANCE_NAME'] = instance.name
1521   result['HYPERVISOR'] = instance.hypervisor
1522   result['DISK_COUNT'] = '%d' % len(instance.disks)
1523   result['NIC_COUNT'] = '%d' % len(instance.nics)
1524   result['DEBUG_LEVEL'] = '%d' % debug
1525   for idx, disk in enumerate(instance.disks):
1526     real_disk = _RecursiveFindBD(disk)
1527     if real_disk is None:
1528       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1529                                     str(disk))
1530     real_disk.Open()
1531     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1532     # FIXME: When disks will have read-only mode, populate this
1533     result['DISK_%d_ACCESS' % idx] = 'W'
1534     if constants.HV_DISK_TYPE in instance.hvparams:
1535       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1536         instance.hvparams[constants.HV_DISK_TYPE]
1537     if disk.dev_type in constants.LDS_BLOCK:
1538       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1539     elif disk.dev_type == constants.LD_FILE:
1540       result['DISK_%d_BACKEND_TYPE' % idx] = \
1541         'file:%s' % disk.physical_id[0]
1542   for idx, nic in enumerate(instance.nics):
1543     result['NIC_%d_MAC' % idx] = nic.mac
1544     if nic.ip:
1545       result['NIC_%d_IP' % idx] = nic.ip
1546     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1547     if constants.HV_NIC_TYPE in instance.hvparams:
1548       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1549         instance.hvparams[constants.HV_NIC_TYPE]
1550
1551   return result
1552
1553 def GrowBlockDevice(disk, amount):
1554   """Grow a stack of block devices.
1555
1556   This function is called recursively, with the childrens being the
1557   first ones to resize.
1558
1559   @type disk: L{objects.Disk}
1560   @param disk: the disk to be grown
1561   @rtype: (status, result)
1562   @return: a tuple with the status of the operation
1563       (True/False), and the errors message if status
1564       is False
1565
1566   """
1567   r_dev = _RecursiveFindBD(disk)
1568   if r_dev is None:
1569     return False, "Cannot find block device %s" % (disk,)
1570
1571   try:
1572     r_dev.Grow(amount)
1573   except errors.BlockDeviceError, err:
1574     return False, str(err)
1575
1576   return True, None
1577
1578
1579 def SnapshotBlockDevice(disk):
1580   """Create a snapshot copy of a block device.
1581
1582   This function is called recursively, and the snapshot is actually created
1583   just for the leaf lvm backend device.
1584
1585   @type disk: L{objects.Disk}
1586   @param disk: the disk to be snapshotted
1587   @rtype: string
1588   @return: snapshot disk path
1589
1590   """
1591   if disk.children:
1592     if len(disk.children) == 1:
1593       # only one child, let's recurse on it
1594       return SnapshotBlockDevice(disk.children[0])
1595     else:
1596       # more than one child, choose one that matches
1597       for child in disk.children:
1598         if child.size == disk.size:
1599           # return implies breaking the loop
1600           return SnapshotBlockDevice(child)
1601   elif disk.dev_type == constants.LD_LV:
1602     r_dev = _RecursiveFindBD(disk)
1603     if r_dev is not None:
1604       # let's stay on the safe side and ask for the full size, for now
1605       return r_dev.Snapshot(disk.size)
1606     else:
1607       return None
1608   else:
1609     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1610                                  " '%s' of type '%s'" %
1611                                  (disk.unique_id, disk.dev_type))
1612
1613
1614 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1615   """Export a block device snapshot to a remote node.
1616
1617   @type disk: L{objects.Disk}
1618   @param disk: the description of the disk to export
1619   @type dest_node: str
1620   @param dest_node: the destination node to export to
1621   @type instance: L{objects.Instance}
1622   @param instance: the instance object to whom the disk belongs
1623   @type cluster_name: str
1624   @param cluster_name: the cluster name, needed for SSH hostalias
1625   @type idx: int
1626   @param idx: the index of the disk in the instance's disk list,
1627       used to export to the OS scripts environment
1628   @rtype: boolean
1629   @return: the success of the operation
1630
1631   """
1632   export_env = OSEnvironment(instance)
1633
1634   inst_os = OSFromDisk(instance.os)
1635   export_script = inst_os.export_script
1636
1637   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1638                                      instance.name, int(time.time()))
1639   if not os.path.exists(constants.LOG_OS_DIR):
1640     os.mkdir(constants.LOG_OS_DIR, 0750)
1641   real_disk = _RecursiveFindBD(disk)
1642   if real_disk is None:
1643     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1644                                   str(disk))
1645   real_disk.Open()
1646
1647   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1648   export_env['EXPORT_INDEX'] = str(idx)
1649
1650   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1651   destfile = disk.physical_id[1]
1652
1653   # the target command is built out of three individual commands,
1654   # which are joined by pipes; we check each individual command for
1655   # valid parameters
1656   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1657                                export_script, logfile)
1658
1659   comprcmd = "gzip"
1660
1661   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1662                                 destdir, destdir, destfile)
1663   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1664                                                    constants.GANETI_RUNAS,
1665                                                    destcmd)
1666
1667   # all commands have been checked, so we're safe to combine them
1668   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1669
1670   result = utils.RunCmd(command, env=export_env)
1671
1672   if result.failed:
1673     logging.error("os snapshot export command '%s' returned error: %s"
1674                   " output: %s", command, result.fail_reason, result.output)
1675     return False
1676
1677   return True
1678
1679
1680 def FinalizeExport(instance, snap_disks):
1681   """Write out the export configuration information.
1682
1683   @type instance: L{objects.Instance}
1684   @param instance: the instance which we export, used for
1685       saving configuration
1686   @type snap_disks: list of L{objects.Disk}
1687   @param snap_disks: list of snapshot block devices, which
1688       will be used to get the actual name of the dump file
1689
1690   @rtype: boolean
1691   @return: the success of the operation
1692
1693   """
1694   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1695   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1696
1697   config = objects.SerializableConfigParser()
1698
1699   config.add_section(constants.INISECT_EXP)
1700   config.set(constants.INISECT_EXP, 'version', '0')
1701   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1702   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1703   config.set(constants.INISECT_EXP, 'os', instance.os)
1704   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1705
1706   config.add_section(constants.INISECT_INS)
1707   config.set(constants.INISECT_INS, 'name', instance.name)
1708   config.set(constants.INISECT_INS, 'memory', '%d' %
1709              instance.beparams[constants.BE_MEMORY])
1710   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1711              instance.beparams[constants.BE_VCPUS])
1712   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1713
1714   nic_total = 0
1715   for nic_count, nic in enumerate(instance.nics):
1716     nic_total += 1
1717     config.set(constants.INISECT_INS, 'nic%d_mac' %
1718                nic_count, '%s' % nic.mac)
1719     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1720     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1721                '%s' % nic.bridge)
1722   # TODO: redundant: on load can read nics until it doesn't exist
1723   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1724
1725   disk_total = 0
1726   for disk_count, disk in enumerate(snap_disks):
1727     if disk:
1728       disk_total += 1
1729       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1730                  ('%s' % disk.iv_name))
1731       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1732                  ('%s' % disk.physical_id[1]))
1733       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1734                  ('%d' % disk.size))
1735
1736   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1737
1738   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1739                   data=config.Dumps())
1740   shutil.rmtree(finaldestdir, True)
1741   shutil.move(destdir, finaldestdir)
1742
1743   return True
1744
1745
1746 def ExportInfo(dest):
1747   """Get export configuration information.
1748
1749   @type dest: str
1750   @param dest: directory containing the export
1751
1752   @rtype: L{objects.SerializableConfigParser}
1753   @return: a serializable config file containing the
1754       export info
1755
1756   """
1757   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1758
1759   config = objects.SerializableConfigParser()
1760   config.read(cff)
1761
1762   if (not config.has_section(constants.INISECT_EXP) or
1763       not config.has_section(constants.INISECT_INS)):
1764     return None
1765
1766   return config
1767
1768
1769 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1770   """Import an os image into an instance.
1771
1772   @type instance: L{objects.Instance}
1773   @param instance: instance to import the disks into
1774   @type src_node: string
1775   @param src_node: source node for the disk images
1776   @type src_images: list of string
1777   @param src_images: absolute paths of the disk images
1778   @rtype: list of boolean
1779   @return: each boolean represent the success of importing the n-th disk
1780
1781   """
1782   import_env = OSEnvironment(instance)
1783   inst_os = OSFromDisk(instance.os)
1784   import_script = inst_os.import_script
1785
1786   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1787                                         instance.name, int(time.time()))
1788   if not os.path.exists(constants.LOG_OS_DIR):
1789     os.mkdir(constants.LOG_OS_DIR, 0750)
1790
1791   comprcmd = "gunzip"
1792   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1793                                import_script, logfile)
1794
1795   final_result = []
1796   for idx, image in enumerate(src_images):
1797     if image:
1798       destcmd = utils.BuildShellCmd('cat %s', image)
1799       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1800                                                        constants.GANETI_RUNAS,
1801                                                        destcmd)
1802       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1803       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1804       import_env['IMPORT_INDEX'] = str(idx)
1805       result = utils.RunCmd(command, env=import_env)
1806       if result.failed:
1807         logging.error("Disk import command '%s' returned error: %s"
1808                       " output: %s", command, result.fail_reason,
1809                       result.output)
1810         final_result.append(False)
1811       else:
1812         final_result.append(True)
1813     else:
1814       final_result.append(True)
1815
1816   return final_result
1817
1818
1819 def ListExports():
1820   """Return a list of exports currently available on this machine.
1821
1822   @rtype: list
1823   @return: list of the exports
1824
1825   """
1826   if os.path.isdir(constants.EXPORT_DIR):
1827     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1828   else:
1829     return []
1830
1831
1832 def RemoveExport(export):
1833   """Remove an existing export from the node.
1834
1835   @type export: str
1836   @param export: the name of the export to remove
1837   @rtype: boolean
1838   @return: the success of the operation
1839
1840   """
1841   target = os.path.join(constants.EXPORT_DIR, export)
1842
1843   shutil.rmtree(target)
1844   # TODO: catch some of the relevant exceptions and provide a pretty
1845   # error message if rmtree fails.
1846
1847   return True
1848
1849
1850 def RenameBlockDevices(devlist):
1851   """Rename a list of block devices.
1852
1853   @type devlist: list of tuples
1854   @param devlist: list of tuples of the form  (disk,
1855       new_logical_id, new_physical_id); disk is an
1856       L{objects.Disk} object describing the current disk,
1857       and new logical_id/physical_id is the name we
1858       rename it to
1859   @rtype: boolean
1860   @return: True if all renames succeeded, False otherwise
1861
1862   """
1863   result = True
1864   for disk, unique_id in devlist:
1865     dev = _RecursiveFindBD(disk)
1866     if dev is None:
1867       result = False
1868       continue
1869     try:
1870       old_rpath = dev.dev_path
1871       dev.Rename(unique_id)
1872       new_rpath = dev.dev_path
1873       if old_rpath != new_rpath:
1874         DevCacheManager.RemoveCache(old_rpath)
1875         # FIXME: we should add the new cache information here, like:
1876         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1877         # but we don't have the owner here - maybe parse from existing
1878         # cache? for now, we only lose lvm data when we rename, which
1879         # is less critical than DRBD or MD
1880     except errors.BlockDeviceError, err:
1881       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1882       result = False
1883   return result
1884
1885
1886 def _TransformFileStorageDir(file_storage_dir):
1887   """Checks whether given file_storage_dir is valid.
1888
1889   Checks wheter the given file_storage_dir is within the cluster-wide
1890   default file_storage_dir stored in SimpleStore. Only paths under that
1891   directory are allowed.
1892
1893   @type file_storage_dir: str
1894   @param file_storage_dir: the path to check
1895
1896   @return: the normalized path if valid, None otherwise
1897
1898   """
1899   cfg = _GetConfig()
1900   file_storage_dir = os.path.normpath(file_storage_dir)
1901   base_file_storage_dir = cfg.GetFileStorageDir()
1902   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1903       base_file_storage_dir):
1904     logging.error("file storage directory '%s' is not under base file"
1905                   " storage directory '%s'",
1906                   file_storage_dir, base_file_storage_dir)
1907     return None
1908   return file_storage_dir
1909
1910
1911 def CreateFileStorageDir(file_storage_dir):
1912   """Create file storage directory.
1913
1914   @type file_storage_dir: str
1915   @param file_storage_dir: directory to create
1916
1917   @rtype: tuple
1918   @return: tuple with first element a boolean indicating wheter dir
1919       creation was successful or not
1920
1921   """
1922   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1923   result = True,
1924   if not file_storage_dir:
1925     result = False,
1926   else:
1927     if os.path.exists(file_storage_dir):
1928       if not os.path.isdir(file_storage_dir):
1929         logging.error("'%s' is not a directory", file_storage_dir)
1930         result = False,
1931     else:
1932       try:
1933         os.makedirs(file_storage_dir, 0750)
1934       except OSError, err:
1935         logging.error("Cannot create file storage directory '%s': %s",
1936                       file_storage_dir, err)
1937         result = False,
1938   return result
1939
1940
1941 def RemoveFileStorageDir(file_storage_dir):
1942   """Remove file storage directory.
1943
1944   Remove it only if it's empty. If not log an error and return.
1945
1946   @type file_storage_dir: str
1947   @param file_storage_dir: the directory we should cleanup
1948   @rtype: tuple (success,)
1949   @return: tuple of one element, C{success}, denoting
1950       whether the operation was successfull
1951
1952   """
1953   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1954   result = True,
1955   if not file_storage_dir:
1956     result = False,
1957   else:
1958     if os.path.exists(file_storage_dir):
1959       if not os.path.isdir(file_storage_dir):
1960         logging.error("'%s' is not a directory", file_storage_dir)
1961         result = False,
1962       # deletes dir only if empty, otherwise we want to return False
1963       try:
1964         os.rmdir(file_storage_dir)
1965       except OSError, err:
1966         logging.exception("Cannot remove file storage directory '%s'",
1967                           file_storage_dir)
1968         result = False,
1969   return result
1970
1971
1972 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1973   """Rename the file storage directory.
1974
1975   @type old_file_storage_dir: str
1976   @param old_file_storage_dir: the current path
1977   @type new_file_storage_dir: str
1978   @param new_file_storage_dir: the name we should rename to
1979   @rtype: tuple (success,)
1980   @return: tuple of one element, C{success}, denoting
1981       whether the operation was successful
1982
1983   """
1984   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1985   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1986   result = True,
1987   if not old_file_storage_dir or not new_file_storage_dir:
1988     result = False,
1989   else:
1990     if not os.path.exists(new_file_storage_dir):
1991       if os.path.isdir(old_file_storage_dir):
1992         try:
1993           os.rename(old_file_storage_dir, new_file_storage_dir)
1994         except OSError, err:
1995           logging.exception("Cannot rename '%s' to '%s'",
1996                             old_file_storage_dir, new_file_storage_dir)
1997           result =  False,
1998       else:
1999         logging.error("'%s' is not a directory", old_file_storage_dir)
2000         result = False,
2001     else:
2002       if os.path.exists(old_file_storage_dir):
2003         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2004                       old_file_storage_dir, new_file_storage_dir)
2005         result = False,
2006   return result
2007
2008
2009 def _IsJobQueueFile(file_name):
2010   """Checks whether the given filename is in the queue directory.
2011
2012   @type file_name: str
2013   @param file_name: the file name we should check
2014   @rtype: boolean
2015   @return: whether the file is under the queue directory
2016
2017   """
2018   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2019   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2020
2021   if not result:
2022     logging.error("'%s' is not a file in the queue directory",
2023                   file_name)
2024
2025   return result
2026
2027
2028 def JobQueueUpdate(file_name, content):
2029   """Updates a file in the queue directory.
2030
2031   This is just a wrapper over L{utils.WriteFile}, with proper
2032   checking.
2033
2034   @type file_name: str
2035   @param file_name: the job file name
2036   @type content: str
2037   @param content: the new job contents
2038   @rtype: boolean
2039   @return: the success of the operation
2040
2041   """
2042   if not _IsJobQueueFile(file_name):
2043     return False
2044
2045   # Write and replace the file atomically
2046   utils.WriteFile(file_name, data=_Decompress(content))
2047
2048   return True
2049
2050
2051 def JobQueueRename(old, new):
2052   """Renames a job queue file.
2053
2054   This is just a wrapper over os.rename with proper checking.
2055
2056   @type old: str
2057   @param old: the old (actual) file name
2058   @type new: str
2059   @param new: the desired file name
2060   @rtype: boolean
2061   @return: the success of the operation
2062
2063   """
2064   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2065     return False
2066
2067   utils.RenameFile(old, new, mkdir=True)
2068
2069   return True
2070
2071
2072 def JobQueueSetDrainFlag(drain_flag):
2073   """Set the drain flag for the queue.
2074
2075   This will set or unset the queue drain flag.
2076
2077   @type drain_flag: boolean
2078   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2079   @rtype: boolean
2080   @return: always True
2081   @warning: the function always returns True
2082
2083   """
2084   if drain_flag:
2085     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2086   else:
2087     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2088
2089   return True
2090
2091
2092 def CloseBlockDevices(instance_name, disks):
2093   """Closes the given block devices.
2094
2095   This means they will be switched to secondary mode (in case of
2096   DRBD).
2097
2098   @param instance_name: if the argument is not empty, the symlinks
2099       of this instance will be removed
2100   @type disks: list of L{objects.Disk}
2101   @param disks: the list of disks to be closed
2102   @rtype: tuple (success, message)
2103   @return: a tuple of success and message, where success
2104       indicates the succes of the operation, and message
2105       which will contain the error details in case we
2106       failed
2107
2108   """
2109   bdevs = []
2110   for cf in disks:
2111     rd = _RecursiveFindBD(cf)
2112     if rd is None:
2113       return (False, "Can't find device %s" % cf)
2114     bdevs.append(rd)
2115
2116   msg = []
2117   for rd in bdevs:
2118     try:
2119       rd.Close()
2120     except errors.BlockDeviceError, err:
2121       msg.append(str(err))
2122   if msg:
2123     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2124   else:
2125     if instance_name:
2126       _RemoveBlockDevLinks(instance_name, disks)
2127     return (True, "All devices secondary")
2128
2129
2130 def ValidateHVParams(hvname, hvparams):
2131   """Validates the given hypervisor parameters.
2132
2133   @type hvname: string
2134   @param hvname: the hypervisor name
2135   @type hvparams: dict
2136   @param hvparams: the hypervisor parameters to be validated
2137   @rtype: tuple (success, message)
2138   @return: a tuple of success and message, where success
2139       indicates the succes of the operation, and message
2140       which will contain the error details in case we
2141       failed
2142
2143   """
2144   try:
2145     hv_type = hypervisor.GetHypervisor(hvname)
2146     hv_type.ValidateParameters(hvparams)
2147     return (True, "Validation passed")
2148   except errors.HypervisorError, err:
2149     return (False, str(err))
2150
2151
2152 def DemoteFromMC():
2153   """Demotes the current node from master candidate role.
2154
2155   """
2156   # try to ensure we're not the master by mistake
2157   master, myself = ssconf.GetMasterAndMyself()
2158   if master == myself:
2159     return (False, "ssconf status shows I'm the master node, will not demote")
2160   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2161   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2162     return (False, "The master daemon is running, will not demote")
2163   try:
2164     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2165   except EnvironmentError, err:
2166     if err.errno != errno.ENOENT:
2167       return (False, "Error while backing up cluster file: %s" % str(err))
2168   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2169   return (True, "Done")
2170
2171
2172 def _FindDisks(nodes_ip, disks):
2173   """Sets the physical ID on disks and returns the block devices.
2174
2175   """
2176   # set the correct physical ID
2177   my_name = utils.HostInfo().name
2178   for cf in disks:
2179     cf.SetPhysicalID(my_name, nodes_ip)
2180
2181   bdevs = []
2182
2183   for cf in disks:
2184     rd = _RecursiveFindBD(cf)
2185     if rd is None:
2186       return (False, "Can't find device %s" % cf)
2187     bdevs.append(rd)
2188   return (True, bdevs)
2189
2190
2191 def DrbdDisconnectNet(nodes_ip, disks):
2192   """Disconnects the network on a list of drbd devices.
2193
2194   """
2195   status, bdevs = _FindDisks(nodes_ip, disks)
2196   if not status:
2197     return status, bdevs
2198
2199   # disconnect disks
2200   for rd in bdevs:
2201     try:
2202       rd.DisconnectNet()
2203     except errors.BlockDeviceError, err:
2204       logging.exception("Failed to go into standalone mode")
2205       return (False, "Can't change network configuration: %s" % str(err))
2206   return (True, "All disks are now disconnected")
2207
2208
2209 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2210   """Attaches the network on a list of drbd devices.
2211
2212   """
2213   status, bdevs = _FindDisks(nodes_ip, disks)
2214   if not status:
2215     return status, bdevs
2216
2217   if multimaster:
2218     for idx, rd in enumerate(bdevs):
2219       try:
2220         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2221       except EnvironmentError, err:
2222         return (False, "Can't create symlink: %s" % str(err))
2223   # reconnect disks, switch to new master configuration and if
2224   # needed primary mode
2225   for rd in bdevs:
2226     try:
2227       rd.AttachNet(multimaster)
2228     except errors.BlockDeviceError, err:
2229       return (False, "Can't change network configuration: %s" % str(err))
2230   # wait until the disks are connected; we need to retry the re-attach
2231   # if the device becomes standalone, as this might happen if the one
2232   # node disconnects and reconnects in a different mode before the
2233   # other node reconnects; in this case, one or both of the nodes will
2234   # decide it has wrong configuration and switch to standalone
2235   RECONNECT_TIMEOUT = 2 * 60
2236   sleep_time = 0.100 # start with 100 miliseconds
2237   timeout_limit = time.time() + RECONNECT_TIMEOUT
2238   while time.time() < timeout_limit:
2239     all_connected = True
2240     for rd in bdevs:
2241       stats = rd.GetProcStatus()
2242       if not (stats.is_connected or stats.is_in_resync):
2243         all_connected = False
2244       if stats.is_standalone:
2245         # peer had different config info and this node became
2246         # standalone, even though this should not happen with the
2247         # new staged way of changing disk configs
2248         try:
2249           rd.ReAttachNet(multimaster)
2250         except errors.BlockDeviceError, err:
2251           return (False, "Can't change network configuration: %s" % str(err))
2252     if all_connected:
2253       break
2254     time.sleep(sleep_time)
2255     sleep_time = min(5, sleep_time * 1.5)
2256   if not all_connected:
2257     return (False, "Timeout in disk reconnecting")
2258   if multimaster:
2259     # change to primary mode
2260     for rd in bdevs:
2261       rd.Open()
2262   if multimaster:
2263     msg = "multi-master and primary"
2264   else:
2265     msg = "single-master"
2266   return (True, "Disks are now configured as %s" % msg)
2267
2268
2269 def DrbdWaitSync(nodes_ip, disks):
2270   """Wait until DRBDs have synchronized.
2271
2272   """
2273   status, bdevs = _FindDisks(nodes_ip, disks)
2274   if not status:
2275     return status, bdevs
2276
2277   min_resync = 100
2278   alldone = True
2279   failure = False
2280   for rd in bdevs:
2281     stats = rd.GetProcStatus()
2282     if not (stats.is_connected or stats.is_in_resync):
2283       failure = True
2284       break
2285     alldone = alldone and (not stats.is_in_resync)
2286     if stats.sync_percent is not None:
2287       min_resync = min(min_resync, stats.sync_percent)
2288   return (not failure, (alldone, min_resync))
2289
2290
2291 class HooksRunner(object):
2292   """Hook runner.
2293
2294   This class is instantiated on the node side (ganeti-noded) and not
2295   on the master side.
2296
2297   """
2298   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2299
2300   def __init__(self, hooks_base_dir=None):
2301     """Constructor for hooks runner.
2302
2303     @type hooks_base_dir: str or None
2304     @param hooks_base_dir: if not None, this overrides the
2305         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2306
2307     """
2308     if hooks_base_dir is None:
2309       hooks_base_dir = constants.HOOKS_BASE_DIR
2310     self._BASE_DIR = hooks_base_dir
2311
2312   @staticmethod
2313   def ExecHook(script, env):
2314     """Exec one hook script.
2315
2316     @type script: str
2317     @param script: the full path to the script
2318     @type env: dict
2319     @param env: the environment with which to exec the script
2320     @rtype: tuple (success, message)
2321     @return: a tuple of success and message, where success
2322         indicates the succes of the operation, and message
2323         which will contain the error details in case we
2324         failed
2325
2326     """
2327     # exec the process using subprocess and log the output
2328     fdstdin = None
2329     try:
2330       fdstdin = open("/dev/null", "r")
2331       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2332                                stderr=subprocess.STDOUT, close_fds=True,
2333                                shell=False, cwd="/", env=env)
2334       output = ""
2335       try:
2336         output = child.stdout.read(4096)
2337         child.stdout.close()
2338       except EnvironmentError, err:
2339         output += "Hook script error: %s" % str(err)
2340
2341       while True:
2342         try:
2343           result = child.wait()
2344           break
2345         except EnvironmentError, err:
2346           if err.errno == errno.EINTR:
2347             continue
2348           raise
2349     finally:
2350       # try not to leak fds
2351       for fd in (fdstdin, ):
2352         if fd is not None:
2353           try:
2354             fd.close()
2355           except EnvironmentError, err:
2356             # just log the error
2357             #logging.exception("Error while closing fd %s", fd)
2358             pass
2359
2360     return result == 0, output
2361
2362   def RunHooks(self, hpath, phase, env):
2363     """Run the scripts in the hooks directory.
2364
2365     @type hpath: str
2366     @param hpath: the path to the hooks directory which
2367         holds the scripts
2368     @type phase: str
2369     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2370         L{constants.HOOKS_PHASE_POST}
2371     @type env: dict
2372     @param env: dictionary with the environment for the hook
2373     @rtype: list
2374     @return: list of 3-element tuples:
2375       - script path
2376       - script result, either L{constants.HKR_SUCCESS} or
2377         L{constants.HKR_FAIL}
2378       - output of the script
2379
2380     @raise errors.ProgrammerError: for invalid input
2381         parameters
2382
2383     """
2384     if phase == constants.HOOKS_PHASE_PRE:
2385       suffix = "pre"
2386     elif phase == constants.HOOKS_PHASE_POST:
2387       suffix = "post"
2388     else:
2389       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2390     rr = []
2391
2392     subdir = "%s-%s.d" % (hpath, suffix)
2393     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2394     try:
2395       dir_contents = utils.ListVisibleFiles(dir_name)
2396     except OSError, err:
2397       # FIXME: must log output in case of failures
2398       return rr
2399
2400     # we use the standard python sort order,
2401     # so 00name is the recommended naming scheme
2402     dir_contents.sort()
2403     for relname in dir_contents:
2404       fname = os.path.join(dir_name, relname)
2405       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2406           self.RE_MASK.match(relname) is not None):
2407         rrval = constants.HKR_SKIP
2408         output = ""
2409       else:
2410         result, output = self.ExecHook(fname, env)
2411         if not result:
2412           rrval = constants.HKR_FAIL
2413         else:
2414           rrval = constants.HKR_SUCCESS
2415       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2416
2417     return rr
2418
2419
2420 class IAllocatorRunner(object):
2421   """IAllocator runner.
2422
2423   This class is instantiated on the node side (ganeti-noded) and not on
2424   the master side.
2425
2426   """
2427   def Run(self, name, idata):
2428     """Run an iallocator script.
2429
2430     @type name: str
2431     @param name: the iallocator script name
2432     @type idata: str
2433     @param idata: the allocator input data
2434
2435     @rtype: tuple
2436     @return: four element tuple of:
2437        - run status (one of the IARUN_ constants)
2438        - stdout
2439        - stderr
2440        - fail reason (as from L{utils.RunResult})
2441
2442     """
2443     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2444                                   os.path.isfile)
2445     if alloc_script is None:
2446       return (constants.IARUN_NOTFOUND, None, None, None)
2447
2448     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2449     try:
2450       os.write(fd, idata)
2451       os.close(fd)
2452       result = utils.RunCmd([alloc_script, fin_name])
2453       if result.failed:
2454         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2455                 result.fail_reason)
2456     finally:
2457       os.unlink(fin_name)
2458
2459     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2460
2461
2462 class DevCacheManager(object):
2463   """Simple class for managing a cache of block device information.
2464
2465   """
2466   _DEV_PREFIX = "/dev/"
2467   _ROOT_DIR = constants.BDEV_CACHE_DIR
2468
2469   @classmethod
2470   def _ConvertPath(cls, dev_path):
2471     """Converts a /dev/name path to the cache file name.
2472
2473     This replaces slashes with underscores and strips the /dev
2474     prefix. It then returns the full path to the cache file.
2475
2476     @type dev_path: str
2477     @param dev_path: the C{/dev/} path name
2478     @rtype: str
2479     @return: the converted path name
2480
2481     """
2482     if dev_path.startswith(cls._DEV_PREFIX):
2483       dev_path = dev_path[len(cls._DEV_PREFIX):]
2484     dev_path = dev_path.replace("/", "_")
2485     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2486     return fpath
2487
2488   @classmethod
2489   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2490     """Updates the cache information for a given device.
2491
2492     @type dev_path: str
2493     @param dev_path: the pathname of the device
2494     @type owner: str
2495     @param owner: the owner (instance name) of the device
2496     @type on_primary: bool
2497     @param on_primary: whether this is the primary
2498         node nor not
2499     @type iv_name: str
2500     @param iv_name: the instance-visible name of the
2501         device, as in objects.Disk.iv_name
2502
2503     @rtype: None
2504
2505     """
2506     if dev_path is None:
2507       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2508       return
2509     fpath = cls._ConvertPath(dev_path)
2510     if on_primary:
2511       state = "primary"
2512     else:
2513       state = "secondary"
2514     if iv_name is None:
2515       iv_name = "not_visible"
2516     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2517     try:
2518       utils.WriteFile(fpath, data=fdata)
2519     except EnvironmentError, err:
2520       logging.exception("Can't update bdev cache for %s", dev_path)
2521
2522   @classmethod
2523   def RemoveCache(cls, dev_path):
2524     """Remove data for a dev_path.
2525
2526     This is just a wrapper over L{utils.RemoveFile} with a converted
2527     path name and logging.
2528
2529     @type dev_path: str
2530     @param dev_path: the pathname of the device
2531
2532     @rtype: None
2533
2534     """
2535     if dev_path is None:
2536       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2537       return
2538     fpath = cls._ConvertPath(dev_path)
2539     try:
2540       utils.RemoveFile(fpath)
2541     except EnvironmentError, err:
2542       logging.exception("Can't update bdev cache for %s", dev_path)