mcpu: Use new timeout class for timeout
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster():
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   JobQueuePurge()
367
368   try:
369     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370
371     utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   try:
379     utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380     utils.RemoveFile(constants.RAPI_CERT_FILE)
381     utils.RemoveFile(constants.SSL_CERT_FILE)
382   except:
383     logging.exception("Error while removing cluster secrets")
384
385   confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386
387   if confd_pid:
388     utils.KillProcess(confd_pid, timeout=2)
389
390   # Raise a custom exception (handled in ganeti-noded)
391   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392
393
394 def GetNodeInfo(vgname, hypervisor_type):
395   """Gives back a hash with different information about the node.
396
397   @type vgname: C{string}
398   @param vgname: the name of the volume group to ask for disk space information
399   @type hypervisor_type: C{str}
400   @param hypervisor_type: the name of the hypervisor to ask for
401       memory information
402   @rtype: C{dict}
403   @return: dictionary with the following keys:
404       - vg_size is the size of the configured volume group in MiB
405       - vg_free is the free size of the volume group in MiB
406       - memory_dom0 is the memory allocated for domain0 in MiB
407       - memory_free is the currently available (free) ram in MiB
408       - memory_total is the total number of ram in MiB
409
410   """
411   outputarray = {}
412   vginfo = _GetVGInfo(vgname)
413   outputarray['vg_size'] = vginfo['vg_size']
414   outputarray['vg_free'] = vginfo['vg_free']
415
416   hyper = hypervisor.GetHypervisor(hypervisor_type)
417   hyp_info = hyper.GetNodeInfo()
418   if hyp_info is not None:
419     outputarray.update(hyp_info)
420
421   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422
423   return outputarray
424
425
426 def VerifyNode(what, cluster_name):
427   """Verify the status of the local node.
428
429   Based on the input L{what} parameter, various checks are done on the
430   local node.
431
432   If the I{filelist} key is present, this list of
433   files is checksummed and the file/checksum pairs are returned.
434
435   If the I{nodelist} key is present, we check that we have
436   connectivity via ssh with the target nodes (and check the hostname
437   report).
438
439   If the I{node-net-test} key is present, we check that we have
440   connectivity to the given nodes via both primary IP and, if
441   applicable, secondary IPs.
442
443   @type what: C{dict}
444   @param what: a dictionary of things to check:
445       - filelist: list of files for which to compute checksums
446       - nodelist: list of nodes we should check ssh communication with
447       - node-net-test: list of nodes we should check node daemon port
448         connectivity with
449       - hypervisor: list with hypervisors to run the verify for
450   @rtype: dict
451   @return: a dictionary with the same keys as the input dict, and
452       values representing the result of the checks
453
454   """
455   result = {}
456
457   if constants.NV_HYPERVISOR in what:
458     result[constants.NV_HYPERVISOR] = tmp = {}
459     for hv_name in what[constants.NV_HYPERVISOR]:
460       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461
462   if constants.NV_FILELIST in what:
463     result[constants.NV_FILELIST] = utils.FingerprintFiles(
464       what[constants.NV_FILELIST])
465
466   if constants.NV_NODELIST in what:
467     result[constants.NV_NODELIST] = tmp = {}
468     random.shuffle(what[constants.NV_NODELIST])
469     for node in what[constants.NV_NODELIST]:
470       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471       if not success:
472         tmp[node] = message
473
474   if constants.NV_NODENETTEST in what:
475     result[constants.NV_NODENETTEST] = tmp = {}
476     my_name = utils.HostInfo().name
477     my_pip = my_sip = None
478     for name, pip, sip in what[constants.NV_NODENETTEST]:
479       if name == my_name:
480         my_pip = pip
481         my_sip = sip
482         break
483     if not my_pip:
484       tmp[my_name] = ("Can't find my own primary/secondary IP"
485                       " in the node list")
486     else:
487       port = utils.GetDaemonPort(constants.NODED)
488       for name, pip, sip in what[constants.NV_NODENETTEST]:
489         fail = []
490         if not utils.TcpPing(pip, port, source=my_pip):
491           fail.append("primary")
492         if sip != pip:
493           if not utils.TcpPing(sip, port, source=my_sip):
494             fail.append("secondary")
495         if fail:
496           tmp[name] = ("failure using the %s interface(s)" %
497                        " and ".join(fail))
498
499   if constants.NV_LVLIST in what:
500     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501
502   if constants.NV_INSTANCELIST in what:
503     result[constants.NV_INSTANCELIST] = GetInstanceList(
504       what[constants.NV_INSTANCELIST])
505
506   if constants.NV_VGLIST in what:
507     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508
509   if constants.NV_VERSION in what:
510     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511                                     constants.RELEASE_VERSION)
512
513   if constants.NV_HVINFO in what:
514     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516
517   if constants.NV_DRBDLIST in what:
518     try:
519       used_minors = bdev.DRBD8.GetUsedDevs().keys()
520     except errors.BlockDeviceError, err:
521       logging.warning("Can't get used minors list", exc_info=True)
522       used_minors = str(err)
523     result[constants.NV_DRBDLIST] = used_minors
524
525   return result
526
527
528 def GetVolumeList(vg_name):
529   """Compute list of logical volumes and their size.
530
531   @type vg_name: str
532   @param vg_name: the volume group whose LVs we should list
533   @rtype: dict
534   @return:
535       dictionary of all partions (key) with value being a tuple of
536       their size (in MiB), inactive and online status::
537
538         {'test1': ('20.06', True, True)}
539
540       in case of errors, a string is returned with the error
541       details.
542
543   """
544   lvs = {}
545   sep = '|'
546   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547                          "--separator=%s" % sep,
548                          "-olv_name,lv_size,lv_attr", vg_name])
549   if result.failed:
550     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
551
552   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553   for line in result.stdout.splitlines():
554     line = line.strip()
555     match = valid_line_re.match(line)
556     if not match:
557       logging.error("Invalid line returned from lvs output: '%s'", line)
558       continue
559     name, size, attr = match.groups()
560     inactive = attr[4] == '-'
561     online = attr[5] == 'o'
562     virtual = attr[0] == 'v'
563     if virtual:
564       # we don't want to report such volumes as existing, since they
565       # don't really hold data
566       continue
567     lvs[name] = (size, inactive, online)
568
569   return lvs
570
571
572 def ListVolumeGroups():
573   """List the volume groups and their size.
574
575   @rtype: dict
576   @return: dictionary with keys volume name and values the
577       size of the volume
578
579   """
580   return utils.ListVolumeGroups()
581
582
583 def NodeVolumes():
584   """List all volumes on this node.
585
586   @rtype: list
587   @return:
588     A list of dictionaries, each having four keys:
589       - name: the logical volume name,
590       - size: the size of the logical volume
591       - dev: the physical device on which the LV lives
592       - vg: the volume group to which it belongs
593
594     In case of errors, we return an empty list and log the
595     error.
596
597     Note that since a logical volume can live on multiple physical
598     volumes, the resulting list might include a logical volume
599     multiple times.
600
601   """
602   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
603                          "--separator=|",
604                          "--options=lv_name,lv_size,devices,vg_name"])
605   if result.failed:
606     _Fail("Failed to list logical volumes, lvs output: %s",
607           result.output)
608
609   def parse_dev(dev):
610     if '(' in dev:
611       return dev.split('(')[0]
612     else:
613       return dev
614
615   def map_line(line):
616     return {
617       'name': line[0].strip(),
618       'size': line[1].strip(),
619       'dev': parse_dev(line[2].strip()),
620       'vg': line[3].strip(),
621     }
622
623   return [map_line(line.split('|')) for line in result.stdout.splitlines()
624           if line.count('|') >= 3]
625
626
627 def BridgesExist(bridges_list):
628   """Check if a list of bridges exist on the current node.
629
630   @rtype: boolean
631   @return: C{True} if all of them exist, C{False} otherwise
632
633   """
634   missing = []
635   for bridge in bridges_list:
636     if not utils.BridgeExists(bridge):
637       missing.append(bridge)
638
639   if missing:
640     _Fail("Missing bridges %s", ", ".join(missing))
641
642
643 def GetInstanceList(hypervisor_list):
644   """Provides a list of instances.
645
646   @type hypervisor_list: list
647   @param hypervisor_list: the list of hypervisors to query information
648
649   @rtype: list
650   @return: a list of all running instances on the current node
651     - instance1.example.com
652     - instance2.example.com
653
654   """
655   results = []
656   for hname in hypervisor_list:
657     try:
658       names = hypervisor.GetHypervisor(hname).ListInstances()
659       results.extend(names)
660     except errors.HypervisorError, err:
661       _Fail("Error enumerating instances (hypervisor %s): %s",
662             hname, err, exc=True)
663
664   return results
665
666
667 def GetInstanceInfo(instance, hname):
668   """Gives back the information about an instance as a dictionary.
669
670   @type instance: string
671   @param instance: the instance name
672   @type hname: string
673   @param hname: the hypervisor type of the instance
674
675   @rtype: dict
676   @return: dictionary with the following keys:
677       - memory: memory size of instance (int)
678       - state: xen state of instance (string)
679       - time: cpu time of instance (float)
680
681   """
682   output = {}
683
684   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685   if iinfo is not None:
686     output['memory'] = iinfo[2]
687     output['state'] = iinfo[4]
688     output['time'] = iinfo[5]
689
690   return output
691
692
693 def GetInstanceMigratable(instance):
694   """Gives whether an instance can be migrated.
695
696   @type instance: L{objects.Instance}
697   @param instance: object representing the instance to be checked.
698
699   @rtype: tuple
700   @return: tuple of (result, description) where:
701       - result: whether the instance can be migrated or not
702       - description: a description of the issue, if relevant
703
704   """
705   hyper = hypervisor.GetHypervisor(instance.hypervisor)
706   iname = instance.name
707   if iname not in hyper.ListInstances():
708     _Fail("Instance %s is not running", iname)
709
710   for idx in range(len(instance.disks)):
711     link_name = _GetBlockDevSymlinkPath(iname, idx)
712     if not os.path.islink(link_name):
713       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
714
715
716 def GetAllInstancesInfo(hypervisor_list):
717   """Gather data about all instances.
718
719   This is the equivalent of L{GetInstanceInfo}, except that it
720   computes data for all instances at once, thus being faster if one
721   needs data about more than one instance.
722
723   @type hypervisor_list: list
724   @param hypervisor_list: list of hypervisors to query for instance data
725
726   @rtype: dict
727   @return: dictionary of instance: data, with data having the following keys:
728       - memory: memory size of instance (int)
729       - state: xen state of instance (string)
730       - time: cpu time of instance (float)
731       - vcpus: the number of vcpus
732
733   """
734   output = {}
735
736   for hname in hypervisor_list:
737     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
738     if iinfo:
739       for name, _, memory, vcpus, state, times in iinfo:
740         value = {
741           'memory': memory,
742           'vcpus': vcpus,
743           'state': state,
744           'time': times,
745           }
746         if name in output:
747           # we only check static parameters, like memory and vcpus,
748           # and not state and time which can change between the
749           # invocations of the different hypervisors
750           for key in 'memory', 'vcpus':
751             if value[key] != output[name][key]:
752               _Fail("Instance %s is running twice"
753                     " with different parameters", name)
754         output[name] = value
755
756   return output
757
758
759 def InstanceOsAdd(instance, reinstall):
760   """Add an OS to an instance.
761
762   @type instance: L{objects.Instance}
763   @param instance: Instance whose OS is to be installed
764   @type reinstall: boolean
765   @param reinstall: whether this is an instance reinstall
766   @rtype: None
767
768   """
769   inst_os = OSFromDisk(instance.os)
770
771   create_env = OSEnvironment(instance, inst_os)
772   if reinstall:
773     create_env['INSTANCE_REINSTALL'] = "1"
774
775   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776                                      instance.name, int(time.time()))
777
778   result = utils.RunCmd([inst_os.create_script], env=create_env,
779                         cwd=inst_os.path, output=logfile,)
780   if result.failed:
781     logging.error("os create command '%s' returned error: %s, logfile: %s,"
782                   " output: %s", result.cmd, result.fail_reason, logfile,
783                   result.output)
784     lines = [utils.SafeEncode(val)
785              for val in utils.TailFile(logfile, lines=20)]
786     _Fail("OS create script failed (%s), last lines in the"
787           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
788
789
790 def RunRenameInstance(instance, old_name):
791   """Run the OS rename script for an instance.
792
793   @type instance: L{objects.Instance}
794   @param instance: Instance whose OS is to be installed
795   @type old_name: string
796   @param old_name: previous instance name
797   @rtype: boolean
798   @return: the success of the operation
799
800   """
801   inst_os = OSFromDisk(instance.os)
802
803   rename_env = OSEnvironment(instance, inst_os)
804   rename_env['OLD_INSTANCE_NAME'] = old_name
805
806   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
807                                            old_name,
808                                            instance.name, int(time.time()))
809
810   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811                         cwd=inst_os.path, output=logfile)
812
813   if result.failed:
814     logging.error("os create command '%s' returned error: %s output: %s",
815                   result.cmd, result.fail_reason, result.output)
816     lines = [utils.SafeEncode(val)
817              for val in utils.TailFile(logfile, lines=20)]
818     _Fail("OS rename script failed (%s), last lines in the"
819           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
820
821
822 def _GetVGInfo(vg_name):
823   """Get information about the volume group.
824
825   @type vg_name: str
826   @param vg_name: the volume group which we query
827   @rtype: dict
828   @return:
829     A dictionary with the following keys:
830       - C{vg_size} is the total size of the volume group in MiB
831       - C{vg_free} is the free size of the volume group in MiB
832       - C{pv_count} are the number of physical disks in that VG
833
834     If an error occurs during gathering of data, we return the same dict
835     with keys all set to None.
836
837   """
838   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
839
840   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841                          "--nosuffix", "--units=m", "--separator=:", vg_name])
842
843   if retval.failed:
844     logging.error("volume group %s not present", vg_name)
845     return retdic
846   valarr = retval.stdout.strip().rstrip(':').split(':')
847   if len(valarr) == 3:
848     try:
849       retdic = {
850         "vg_size": int(round(float(valarr[0]), 0)),
851         "vg_free": int(round(float(valarr[1]), 0)),
852         "pv_count": int(valarr[2]),
853         }
854     except ValueError, err:
855       logging.exception("Fail to parse vgs output: %s", err)
856   else:
857     logging.error("vgs output has the wrong number of fields (expected"
858                   " three): %s", str(valarr))
859   return retdic
860
861
862 def _GetBlockDevSymlinkPath(instance_name, idx):
863   return os.path.join(constants.DISK_LINKS_DIR,
864                       "%s:%d" % (instance_name, idx))
865
866
867 def _SymlinkBlockDev(instance_name, device_path, idx):
868   """Set up symlinks to a instance's block device.
869
870   This is an auxiliary function run when an instance is start (on the primary
871   node) or when an instance is migrated (on the target node).
872
873
874   @param instance_name: the name of the target instance
875   @param device_path: path of the physical block device, on the node
876   @param idx: the disk index
877   @return: absolute path to the disk's symlink
878
879   """
880   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
881   try:
882     os.symlink(device_path, link_name)
883   except OSError, err:
884     if err.errno == errno.EEXIST:
885       if (not os.path.islink(link_name) or
886           os.readlink(link_name) != device_path):
887         os.remove(link_name)
888         os.symlink(device_path, link_name)
889     else:
890       raise
891
892   return link_name
893
894
895 def _RemoveBlockDevLinks(instance_name, disks):
896   """Remove the block device symlinks belonging to the given instance.
897
898   """
899   for idx, _ in enumerate(disks):
900     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901     if os.path.islink(link_name):
902       try:
903         os.remove(link_name)
904       except OSError:
905         logging.exception("Can't remove symlink '%s'", link_name)
906
907
908 def _GatherAndLinkBlockDevs(instance):
909   """Set up an instance's block device(s).
910
911   This is run on the primary node at instance startup. The block
912   devices must be already assembled.
913
914   @type instance: L{objects.Instance}
915   @param instance: the instance whose disks we shoul assemble
916   @rtype: list
917   @return: list of (disk_object, device_path)
918
919   """
920   block_devices = []
921   for idx, disk in enumerate(instance.disks):
922     device = _RecursiveFindBD(disk)
923     if device is None:
924       raise errors.BlockDeviceError("Block device '%s' is not set up." %
925                                     str(disk))
926     device.Open()
927     try:
928       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
929     except OSError, e:
930       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
931                                     e.strerror)
932
933     block_devices.append((disk, link_name))
934
935   return block_devices
936
937
938 def StartInstance(instance):
939   """Start an instance.
940
941   @type instance: L{objects.Instance}
942   @param instance: the instance object
943   @rtype: None
944
945   """
946   running_instances = GetInstanceList([instance.hypervisor])
947
948   if instance.name in running_instances:
949     logging.info("Instance %s already running, not starting", instance.name)
950     return
951
952   try:
953     block_devices = _GatherAndLinkBlockDevs(instance)
954     hyper = hypervisor.GetHypervisor(instance.hypervisor)
955     hyper.StartInstance(instance, block_devices)
956   except errors.BlockDeviceError, err:
957     _Fail("Block device error: %s", err, exc=True)
958   except errors.HypervisorError, err:
959     _RemoveBlockDevLinks(instance.name, instance.disks)
960     _Fail("Hypervisor error: %s", err, exc=True)
961
962
963 def InstanceShutdown(instance, timeout):
964   """Shut an instance down.
965
966   @note: this functions uses polling with a hardcoded timeout.
967
968   @type instance: L{objects.Instance}
969   @param instance: the instance object
970   @type timeout: integer
971   @param timeout: maximum timeout for soft shutdown
972   @rtype: None
973
974   """
975   hv_name = instance.hypervisor
976   hyper = hypervisor.GetHypervisor(hv_name)
977   running_instances = hyper.ListInstances()
978   iname = instance.name
979
980   if iname not in running_instances:
981     logging.info("Instance %s not running, doing nothing", iname)
982     return
983
984   start = time.time()
985   end = start + timeout
986   sleep_time = 1
987
988   tried_once = False
989   while not tried_once and time.time() < end:
990     try:
991       hyper.StopInstance(instance, retry=tried_once)
992     except errors.HypervisorError, err:
993       _Fail("Failed to stop instance %s: %s", iname, err)
994     tried_once = True
995     time.sleep(sleep_time)
996     if instance.name not in hyper.ListInstances():
997       break
998     if sleep_time < 5:
999       # 1.2 behaves particularly good for our case:
1000       # it gives us 10 increasing steps and caps just slightly above 5 seconds
1001       sleep_time *= 1.2
1002   else:
1003     # the shutdown did not succeed
1004     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1005
1006     try:
1007       hyper.StopInstance(instance, force=True)
1008     except errors.HypervisorError, err:
1009       _Fail("Failed to force stop instance %s: %s", iname, err)
1010
1011     time.sleep(1)
1012     if instance.name in GetInstanceList([hv_name]):
1013       _Fail("Could not shutdown instance %s even by destroy", iname)
1014
1015   _RemoveBlockDevLinks(iname, instance.disks)
1016
1017
1018 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1019   """Reboot an instance.
1020
1021   @type instance: L{objects.Instance}
1022   @param instance: the instance object to reboot
1023   @type reboot_type: str
1024   @param reboot_type: the type of reboot, one the following
1025     constants:
1026       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1027         instance OS, do not recreate the VM
1028       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1029         restart the VM (at the hypervisor level)
1030       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1031         not accepted here, since that mode is handled differently, in
1032         cmdlib, and translates into full stop and start of the
1033         instance (instead of a call_instance_reboot RPC)
1034   @type timeout: integer
1035   @param timeout: maximum timeout for soft shutdown
1036   @rtype: None
1037
1038   """
1039   running_instances = GetInstanceList([instance.hypervisor])
1040
1041   if instance.name not in running_instances:
1042     _Fail("Cannot reboot instance %s that is not running", instance.name)
1043
1044   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1045   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1046     try:
1047       hyper.RebootInstance(instance)
1048     except errors.HypervisorError, err:
1049       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1050   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1051     try:
1052       InstanceShutdown(instance, shutdown_timeout)
1053       return StartInstance(instance)
1054     except errors.HypervisorError, err:
1055       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1056   else:
1057     _Fail("Invalid reboot_type received: %s", reboot_type)
1058
1059
1060 def MigrationInfo(instance):
1061   """Gather information about an instance to be migrated.
1062
1063   @type instance: L{objects.Instance}
1064   @param instance: the instance definition
1065
1066   """
1067   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1068   try:
1069     info = hyper.MigrationInfo(instance)
1070   except errors.HypervisorError, err:
1071     _Fail("Failed to fetch migration information: %s", err, exc=True)
1072   return info
1073
1074
1075 def AcceptInstance(instance, info, target):
1076   """Prepare the node to accept an instance.
1077
1078   @type instance: L{objects.Instance}
1079   @param instance: the instance definition
1080   @type info: string/data (opaque)
1081   @param info: migration information, from the source node
1082   @type target: string
1083   @param target: target host (usually ip), on this node
1084
1085   """
1086   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1087   try:
1088     hyper.AcceptInstance(instance, info, target)
1089   except errors.HypervisorError, err:
1090     _Fail("Failed to accept instance: %s", err, exc=True)
1091
1092
1093 def FinalizeMigration(instance, info, success):
1094   """Finalize any preparation to accept an instance.
1095
1096   @type instance: L{objects.Instance}
1097   @param instance: the instance definition
1098   @type info: string/data (opaque)
1099   @param info: migration information, from the source node
1100   @type success: boolean
1101   @param success: whether the migration was a success or a failure
1102
1103   """
1104   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1105   try:
1106     hyper.FinalizeMigration(instance, info, success)
1107   except errors.HypervisorError, err:
1108     _Fail("Failed to finalize migration: %s", err, exc=True)
1109
1110
1111 def MigrateInstance(instance, target, live):
1112   """Migrates an instance to another node.
1113
1114   @type instance: L{objects.Instance}
1115   @param instance: the instance definition
1116   @type target: string
1117   @param target: the target node name
1118   @type live: boolean
1119   @param live: whether the migration should be done live or not (the
1120       interpretation of this parameter is left to the hypervisor)
1121   @rtype: tuple
1122   @return: a tuple of (success, msg) where:
1123       - succes is a boolean denoting the success/failure of the operation
1124       - msg is a string with details in case of failure
1125
1126   """
1127   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1128
1129   try:
1130     hyper.MigrateInstance(instance.name, target, live)
1131   except errors.HypervisorError, err:
1132     _Fail("Failed to migrate instance: %s", err, exc=True)
1133
1134
1135 def BlockdevCreate(disk, size, owner, on_primary, info):
1136   """Creates a block device for an instance.
1137
1138   @type disk: L{objects.Disk}
1139   @param disk: the object describing the disk we should create
1140   @type size: int
1141   @param size: the size of the physical underlying device, in MiB
1142   @type owner: str
1143   @param owner: the name of the instance for which disk is created,
1144       used for device cache data
1145   @type on_primary: boolean
1146   @param on_primary:  indicates if it is the primary node or not
1147   @type info: string
1148   @param info: string that will be sent to the physical device
1149       creation, used for example to set (LVM) tags on LVs
1150
1151   @return: the new unique_id of the device (this can sometime be
1152       computed only after creation), or None. On secondary nodes,
1153       it's not required to return anything.
1154
1155   """
1156   clist = []
1157   if disk.children:
1158     for child in disk.children:
1159       try:
1160         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1161       except errors.BlockDeviceError, err:
1162         _Fail("Can't assemble device %s: %s", child, err)
1163       if on_primary or disk.AssembleOnSecondary():
1164         # we need the children open in case the device itself has to
1165         # be assembled
1166         try:
1167           crdev.Open()
1168         except errors.BlockDeviceError, err:
1169           _Fail("Can't make child '%s' read-write: %s", child, err)
1170       clist.append(crdev)
1171
1172   try:
1173     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1174   except errors.BlockDeviceError, err:
1175     _Fail("Can't create block device: %s", err)
1176
1177   if on_primary or disk.AssembleOnSecondary():
1178     try:
1179       device.Assemble()
1180     except errors.BlockDeviceError, err:
1181       _Fail("Can't assemble device after creation, unusual event: %s", err)
1182     device.SetSyncSpeed(constants.SYNC_SPEED)
1183     if on_primary or disk.OpenOnSecondary():
1184       try:
1185         device.Open(force=True)
1186       except errors.BlockDeviceError, err:
1187         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1188     DevCacheManager.UpdateCache(device.dev_path, owner,
1189                                 on_primary, disk.iv_name)
1190
1191   device.SetInfo(info)
1192
1193   return device.unique_id
1194
1195
1196 def BlockdevRemove(disk):
1197   """Remove a block device.
1198
1199   @note: This is intended to be called recursively.
1200
1201   @type disk: L{objects.Disk}
1202   @param disk: the disk object we should remove
1203   @rtype: boolean
1204   @return: the success of the operation
1205
1206   """
1207   msgs = []
1208   try:
1209     rdev = _RecursiveFindBD(disk)
1210   except errors.BlockDeviceError, err:
1211     # probably can't attach
1212     logging.info("Can't attach to device %s in remove", disk)
1213     rdev = None
1214   if rdev is not None:
1215     r_path = rdev.dev_path
1216     try:
1217       rdev.Remove()
1218     except errors.BlockDeviceError, err:
1219       msgs.append(str(err))
1220     if not msgs:
1221       DevCacheManager.RemoveCache(r_path)
1222
1223   if disk.children:
1224     for child in disk.children:
1225       try:
1226         BlockdevRemove(child)
1227       except RPCFail, err:
1228         msgs.append(str(err))
1229
1230   if msgs:
1231     _Fail("; ".join(msgs))
1232
1233
1234 def _RecursiveAssembleBD(disk, owner, as_primary):
1235   """Activate a block device for an instance.
1236
1237   This is run on the primary and secondary nodes for an instance.
1238
1239   @note: this function is called recursively.
1240
1241   @type disk: L{objects.Disk}
1242   @param disk: the disk we try to assemble
1243   @type owner: str
1244   @param owner: the name of the instance which owns the disk
1245   @type as_primary: boolean
1246   @param as_primary: if we should make the block device
1247       read/write
1248
1249   @return: the assembled device or None (in case no device
1250       was assembled)
1251   @raise errors.BlockDeviceError: in case there is an error
1252       during the activation of the children or the device
1253       itself
1254
1255   """
1256   children = []
1257   if disk.children:
1258     mcn = disk.ChildrenNeeded()
1259     if mcn == -1:
1260       mcn = 0 # max number of Nones allowed
1261     else:
1262       mcn = len(disk.children) - mcn # max number of Nones
1263     for chld_disk in disk.children:
1264       try:
1265         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1266       except errors.BlockDeviceError, err:
1267         if children.count(None) >= mcn:
1268           raise
1269         cdev = None
1270         logging.error("Error in child activation (but continuing): %s",
1271                       str(err))
1272       children.append(cdev)
1273
1274   if as_primary or disk.AssembleOnSecondary():
1275     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1276     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1277     result = r_dev
1278     if as_primary or disk.OpenOnSecondary():
1279       r_dev.Open()
1280     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1281                                 as_primary, disk.iv_name)
1282
1283   else:
1284     result = True
1285   return result
1286
1287
1288 def BlockdevAssemble(disk, owner, as_primary):
1289   """Activate a block device for an instance.
1290
1291   This is a wrapper over _RecursiveAssembleBD.
1292
1293   @rtype: str or boolean
1294   @return: a C{/dev/...} path for primary nodes, and
1295       C{True} for secondary nodes
1296
1297   """
1298   try:
1299     result = _RecursiveAssembleBD(disk, owner, as_primary)
1300     if isinstance(result, bdev.BlockDev):
1301       result = result.dev_path
1302   except errors.BlockDeviceError, err:
1303     _Fail("Error while assembling disk: %s", err, exc=True)
1304
1305   return result
1306
1307
1308 def BlockdevShutdown(disk):
1309   """Shut down a block device.
1310
1311   First, if the device is assembled (Attach() is successful), then
1312   the device is shutdown. Then the children of the device are
1313   shutdown.
1314
1315   This function is called recursively. Note that we don't cache the
1316   children or such, as oppossed to assemble, shutdown of different
1317   devices doesn't require that the upper device was active.
1318
1319   @type disk: L{objects.Disk}
1320   @param disk: the description of the disk we should
1321       shutdown
1322   @rtype: None
1323
1324   """
1325   msgs = []
1326   r_dev = _RecursiveFindBD(disk)
1327   if r_dev is not None:
1328     r_path = r_dev.dev_path
1329     try:
1330       r_dev.Shutdown()
1331       DevCacheManager.RemoveCache(r_path)
1332     except errors.BlockDeviceError, err:
1333       msgs.append(str(err))
1334
1335   if disk.children:
1336     for child in disk.children:
1337       try:
1338         BlockdevShutdown(child)
1339       except RPCFail, err:
1340         msgs.append(str(err))
1341
1342   if msgs:
1343     _Fail("; ".join(msgs))
1344
1345
1346 def BlockdevAddchildren(parent_cdev, new_cdevs):
1347   """Extend a mirrored block device.
1348
1349   @type parent_cdev: L{objects.Disk}
1350   @param parent_cdev: the disk to which we should add children
1351   @type new_cdevs: list of L{objects.Disk}
1352   @param new_cdevs: the list of children which we should add
1353   @rtype: None
1354
1355   """
1356   parent_bdev = _RecursiveFindBD(parent_cdev)
1357   if parent_bdev is None:
1358     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1359   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1360   if new_bdevs.count(None) > 0:
1361     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1362   parent_bdev.AddChildren(new_bdevs)
1363
1364
1365 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1366   """Shrink a mirrored block device.
1367
1368   @type parent_cdev: L{objects.Disk}
1369   @param parent_cdev: the disk from which we should remove children
1370   @type new_cdevs: list of L{objects.Disk}
1371   @param new_cdevs: the list of children which we should remove
1372   @rtype: None
1373
1374   """
1375   parent_bdev = _RecursiveFindBD(parent_cdev)
1376   if parent_bdev is None:
1377     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1378   devs = []
1379   for disk in new_cdevs:
1380     rpath = disk.StaticDevPath()
1381     if rpath is None:
1382       bd = _RecursiveFindBD(disk)
1383       if bd is None:
1384         _Fail("Can't find device %s while removing children", disk)
1385       else:
1386         devs.append(bd.dev_path)
1387     else:
1388       devs.append(rpath)
1389   parent_bdev.RemoveChildren(devs)
1390
1391
1392 def BlockdevGetmirrorstatus(disks):
1393   """Get the mirroring status of a list of devices.
1394
1395   @type disks: list of L{objects.Disk}
1396   @param disks: the list of disks which we should query
1397   @rtype: disk
1398   @return:
1399       a list of (mirror_done, estimated_time) tuples, which
1400       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1401   @raise errors.BlockDeviceError: if any of the disks cannot be
1402       found
1403
1404   """
1405   stats = []
1406   for dsk in disks:
1407     rbd = _RecursiveFindBD(dsk)
1408     if rbd is None:
1409       _Fail("Can't find device %s", dsk)
1410
1411     stats.append(rbd.CombinedSyncStatus())
1412
1413   return stats
1414
1415
1416 def _RecursiveFindBD(disk):
1417   """Check if a device is activated.
1418
1419   If so, return information about the real device.
1420
1421   @type disk: L{objects.Disk}
1422   @param disk: the disk object we need to find
1423
1424   @return: None if the device can't be found,
1425       otherwise the device instance
1426
1427   """
1428   children = []
1429   if disk.children:
1430     for chdisk in disk.children:
1431       children.append(_RecursiveFindBD(chdisk))
1432
1433   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1434
1435
1436 def BlockdevFind(disk):
1437   """Check if a device is activated.
1438
1439   If it is, return information about the real device.
1440
1441   @type disk: L{objects.Disk}
1442   @param disk: the disk to find
1443   @rtype: None or objects.BlockDevStatus
1444   @return: None if the disk cannot be found, otherwise a the current
1445            information
1446
1447   """
1448   try:
1449     rbd = _RecursiveFindBD(disk)
1450   except errors.BlockDeviceError, err:
1451     _Fail("Failed to find device: %s", err, exc=True)
1452
1453   if rbd is None:
1454     return None
1455
1456   return rbd.GetSyncStatus()
1457
1458
1459 def BlockdevGetsize(disks):
1460   """Computes the size of the given disks.
1461
1462   If a disk is not found, returns None instead.
1463
1464   @type disks: list of L{objects.Disk}
1465   @param disks: the list of disk to compute the size for
1466   @rtype: list
1467   @return: list with elements None if the disk cannot be found,
1468       otherwise the size
1469
1470   """
1471   result = []
1472   for cf in disks:
1473     try:
1474       rbd = _RecursiveFindBD(cf)
1475     except errors.BlockDeviceError, err:
1476       result.append(None)
1477       continue
1478     if rbd is None:
1479       result.append(None)
1480     else:
1481       result.append(rbd.GetActualSize())
1482   return result
1483
1484
1485 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1486   """Export a block device to a remote node.
1487
1488   @type disk: L{objects.Disk}
1489   @param disk: the description of the disk to export
1490   @type dest_node: str
1491   @param dest_node: the destination node to export to
1492   @type dest_path: str
1493   @param dest_path: the destination path on the target node
1494   @type cluster_name: str
1495   @param cluster_name: the cluster name, needed for SSH hostalias
1496   @rtype: None
1497
1498   """
1499   real_disk = _RecursiveFindBD(disk)
1500   if real_disk is None:
1501     _Fail("Block device '%s' is not set up", disk)
1502
1503   real_disk.Open()
1504
1505   # the block size on the read dd is 1MiB to match our units
1506   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1507                                "dd if=%s bs=1048576 count=%s",
1508                                real_disk.dev_path, str(disk.size))
1509
1510   # we set here a smaller block size as, due to ssh buffering, more
1511   # than 64-128k will mostly ignored; we use nocreat to fail if the
1512   # device is not already there or we pass a wrong path; we use
1513   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1514   # to not buffer too much memory; this means that at best, we flush
1515   # every 64k, which will not be very fast
1516   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1517                                 " oflag=dsync", dest_path)
1518
1519   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1520                                                    constants.GANETI_RUNAS,
1521                                                    destcmd)
1522
1523   # all commands have been checked, so we're safe to combine them
1524   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1525
1526   result = utils.RunCmd(["bash", "-c", command])
1527
1528   if result.failed:
1529     _Fail("Disk copy command '%s' returned error: %s"
1530           " output: %s", command, result.fail_reason, result.output)
1531
1532
1533 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1534   """Write a file to the filesystem.
1535
1536   This allows the master to overwrite(!) a file. It will only perform
1537   the operation if the file belongs to a list of configuration files.
1538
1539   @type file_name: str
1540   @param file_name: the target file name
1541   @type data: str
1542   @param data: the new contents of the file
1543   @type mode: int
1544   @param mode: the mode to give the file (can be None)
1545   @type uid: int
1546   @param uid: the owner of the file (can be -1 for default)
1547   @type gid: int
1548   @param gid: the group of the file (can be -1 for default)
1549   @type atime: float
1550   @param atime: the atime to set on the file (can be None)
1551   @type mtime: float
1552   @param mtime: the mtime to set on the file (can be None)
1553   @rtype: None
1554
1555   """
1556   if not os.path.isabs(file_name):
1557     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1558
1559   if file_name not in _ALLOWED_UPLOAD_FILES:
1560     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1561           file_name)
1562
1563   raw_data = _Decompress(data)
1564
1565   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1566                   atime=atime, mtime=mtime)
1567
1568
1569 def WriteSsconfFiles(values):
1570   """Update all ssconf files.
1571
1572   Wrapper around the SimpleStore.WriteFiles.
1573
1574   """
1575   ssconf.SimpleStore().WriteFiles(values)
1576
1577
1578 def _ErrnoOrStr(err):
1579   """Format an EnvironmentError exception.
1580
1581   If the L{err} argument has an errno attribute, it will be looked up
1582   and converted into a textual C{E...} description. Otherwise the
1583   string representation of the error will be returned.
1584
1585   @type err: L{EnvironmentError}
1586   @param err: the exception to format
1587
1588   """
1589   if hasattr(err, 'errno'):
1590     detail = errno.errorcode[err.errno]
1591   else:
1592     detail = str(err)
1593   return detail
1594
1595
1596 def _OSOndiskAPIVersion(name, os_dir):
1597   """Compute and return the API version of a given OS.
1598
1599   This function will try to read the API version of the OS given by
1600   the 'name' parameter and residing in the 'os_dir' directory.
1601
1602   @type name: str
1603   @param name: the OS name we should look for
1604   @type os_dir: str
1605   @param os_dir: the directory inwhich we should look for the OS
1606   @rtype: tuple
1607   @return: tuple (status, data) with status denoting the validity and
1608       data holding either the vaid versions or an error message
1609
1610   """
1611   api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1612
1613   try:
1614     st = os.stat(api_file)
1615   except EnvironmentError, err:
1616     return False, ("Required file '%s' not found under path %s: %s" %
1617                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1618
1619   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1620     return False, ("File '%s' in %s is not a regular file" %
1621                    (constants.OS_API_FILE, os_dir))
1622
1623   try:
1624     api_versions = utils.ReadFile(api_file).splitlines()
1625   except EnvironmentError, err:
1626     return False, ("Error while reading the API version file at %s: %s" %
1627                    (api_file, _ErrnoOrStr(err)))
1628
1629   try:
1630     api_versions = [int(version.strip()) for version in api_versions]
1631   except (TypeError, ValueError), err:
1632     return False, ("API version(s) can't be converted to integer: %s" %
1633                    str(err))
1634
1635   return True, api_versions
1636
1637
1638 def DiagnoseOS(top_dirs=None):
1639   """Compute the validity for all OSes.
1640
1641   @type top_dirs: list
1642   @param top_dirs: the list of directories in which to
1643       search (if not given defaults to
1644       L{constants.OS_SEARCH_PATH})
1645   @rtype: list of L{objects.OS}
1646   @return: a list of tuples (name, path, status, diagnose, variants)
1647       for all (potential) OSes under all search paths, where:
1648           - name is the (potential) OS name
1649           - path is the full path to the OS
1650           - status True/False is the validity of the OS
1651           - diagnose is the error message for an invalid OS, otherwise empty
1652           - variants is a list of supported OS variants, if any
1653
1654   """
1655   if top_dirs is None:
1656     top_dirs = constants.OS_SEARCH_PATH
1657
1658   result = []
1659   for dir_name in top_dirs:
1660     if os.path.isdir(dir_name):
1661       try:
1662         f_names = utils.ListVisibleFiles(dir_name)
1663       except EnvironmentError, err:
1664         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1665         break
1666       for name in f_names:
1667         os_path = os.path.sep.join([dir_name, name])
1668         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1669         if status:
1670           diagnose = ""
1671           variants = os_inst.supported_variants
1672         else:
1673           diagnose = os_inst
1674           variants = []
1675         result.append((name, os_path, status, diagnose, variants))
1676
1677   return result
1678
1679
1680 def _TryOSFromDisk(name, base_dir=None):
1681   """Create an OS instance from disk.
1682
1683   This function will return an OS instance if the given name is a
1684   valid OS name.
1685
1686   @type base_dir: string
1687   @keyword base_dir: Base directory containing OS installations.
1688                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1689   @rtype: tuple
1690   @return: success and either the OS instance if we find a valid one,
1691       or error message
1692
1693   """
1694   if base_dir is None:
1695     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1696     if os_dir is None:
1697       return False, "Directory for OS %s not found in search path" % name
1698   else:
1699     os_dir = os.path.sep.join([base_dir, name])
1700
1701   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1702   if not status:
1703     # push the error up
1704     return status, api_versions
1705
1706   if not constants.OS_API_VERSIONS.intersection(api_versions):
1707     return False, ("API version mismatch for path '%s': found %s, want %s." %
1708                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1709
1710   # OS Files dictionary, we will populate it with the absolute path names
1711   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1712
1713   if max(api_versions) >= constants.OS_API_V15:
1714     os_files[constants.OS_VARIANTS_FILE] = ''
1715
1716   for name in os_files:
1717     os_files[name] = os.path.sep.join([os_dir, name])
1718
1719     try:
1720       st = os.stat(os_files[name])
1721     except EnvironmentError, err:
1722       return False, ("File '%s' under path '%s' is missing (%s)" %
1723                      (name, os_dir, _ErrnoOrStr(err)))
1724
1725     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1726       return False, ("File '%s' under path '%s' is not a regular file" %
1727                      (name, os_dir))
1728
1729     if name in constants.OS_SCRIPTS:
1730       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1731         return False, ("File '%s' under path '%s' is not executable" %
1732                        (name, os_dir))
1733
1734   variants = None
1735   if constants.OS_VARIANTS_FILE in os_files:
1736     variants_file = os_files[constants.OS_VARIANTS_FILE]
1737     try:
1738       variants = utils.ReadFile(variants_file).splitlines()
1739     except EnvironmentError, err:
1740       return False, ("Error while reading the OS variants file at %s: %s" %
1741                      (variants_file, _ErrnoOrStr(err)))
1742     if not variants:
1743       return False, ("No supported os variant found")
1744
1745   os_obj = objects.OS(name=name, path=os_dir,
1746                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1747                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1748                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1749                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1750                       supported_variants=variants,
1751                       api_versions=api_versions)
1752   return True, os_obj
1753
1754
1755 def OSFromDisk(name, base_dir=None):
1756   """Create an OS instance from disk.
1757
1758   This function will return an OS instance if the given name is a
1759   valid OS name. Otherwise, it will raise an appropriate
1760   L{RPCFail} exception, detailing why this is not a valid OS.
1761
1762   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1763   an exception but returns true/false status data.
1764
1765   @type base_dir: string
1766   @keyword base_dir: Base directory containing OS installations.
1767                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1768   @rtype: L{objects.OS}
1769   @return: the OS instance if we find a valid one
1770   @raise RPCFail: if we don't find a valid OS
1771
1772   """
1773   name_only = name.split("+", 1)[0]
1774   status, payload = _TryOSFromDisk(name_only, base_dir)
1775
1776   if not status:
1777     _Fail(payload)
1778
1779   return payload
1780
1781
1782 def OSEnvironment(instance, os, debug=0):
1783   """Calculate the environment for an os script.
1784
1785   @type instance: L{objects.Instance}
1786   @param instance: target instance for the os script run
1787   @type os: L{objects.OS}
1788   @param os: operating system for which the environment is being built
1789   @type debug: integer
1790   @param debug: debug level (0 or 1, for OS Api 10)
1791   @rtype: dict
1792   @return: dict of environment variables
1793   @raise errors.BlockDeviceError: if the block device
1794       cannot be found
1795
1796   """
1797   result = {}
1798   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1799   result['OS_API_VERSION'] = '%d' % api_version
1800   result['INSTANCE_NAME'] = instance.name
1801   result['INSTANCE_OS'] = instance.os
1802   result['HYPERVISOR'] = instance.hypervisor
1803   result['DISK_COUNT'] = '%d' % len(instance.disks)
1804   result['NIC_COUNT'] = '%d' % len(instance.nics)
1805   result['DEBUG_LEVEL'] = '%d' % debug
1806   if api_version >= constants.OS_API_V15:
1807     try:
1808       variant = instance.os.split('+', 1)[1]
1809     except IndexError:
1810       variant = os.supported_variants[0]
1811     result['OS_VARIANT'] = variant
1812   for idx, disk in enumerate(instance.disks):
1813     real_disk = _RecursiveFindBD(disk)
1814     if real_disk is None:
1815       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1816                                     str(disk))
1817     real_disk.Open()
1818     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1819     result['DISK_%d_ACCESS' % idx] = disk.mode
1820     if constants.HV_DISK_TYPE in instance.hvparams:
1821       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1822         instance.hvparams[constants.HV_DISK_TYPE]
1823     if disk.dev_type in constants.LDS_BLOCK:
1824       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1825     elif disk.dev_type == constants.LD_FILE:
1826       result['DISK_%d_BACKEND_TYPE' % idx] = \
1827         'file:%s' % disk.physical_id[0]
1828   for idx, nic in enumerate(instance.nics):
1829     result['NIC_%d_MAC' % idx] = nic.mac
1830     if nic.ip:
1831       result['NIC_%d_IP' % idx] = nic.ip
1832     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1833     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1834       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1835     if nic.nicparams[constants.NIC_LINK]:
1836       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1837     if constants.HV_NIC_TYPE in instance.hvparams:
1838       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1839         instance.hvparams[constants.HV_NIC_TYPE]
1840
1841   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1842     for key, value in source.items():
1843       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1844
1845   return result
1846
1847 def BlockdevGrow(disk, amount):
1848   """Grow a stack of block devices.
1849
1850   This function is called recursively, with the childrens being the
1851   first ones to resize.
1852
1853   @type disk: L{objects.Disk}
1854   @param disk: the disk to be grown
1855   @rtype: (status, result)
1856   @return: a tuple with the status of the operation
1857       (True/False), and the errors message if status
1858       is False
1859
1860   """
1861   r_dev = _RecursiveFindBD(disk)
1862   if r_dev is None:
1863     _Fail("Cannot find block device %s", disk)
1864
1865   try:
1866     r_dev.Grow(amount)
1867   except errors.BlockDeviceError, err:
1868     _Fail("Failed to grow block device: %s", err, exc=True)
1869
1870
1871 def BlockdevSnapshot(disk):
1872   """Create a snapshot copy of a block device.
1873
1874   This function is called recursively, and the snapshot is actually created
1875   just for the leaf lvm backend device.
1876
1877   @type disk: L{objects.Disk}
1878   @param disk: the disk to be snapshotted
1879   @rtype: string
1880   @return: snapshot disk path
1881
1882   """
1883   if disk.children:
1884     if len(disk.children) == 1:
1885       # only one child, let's recurse on it
1886       return BlockdevSnapshot(disk.children[0])
1887     else:
1888       # more than one child, choose one that matches
1889       for child in disk.children:
1890         if child.size == disk.size:
1891           # return implies breaking the loop
1892           return BlockdevSnapshot(child)
1893   elif disk.dev_type == constants.LD_LV:
1894     r_dev = _RecursiveFindBD(disk)
1895     if r_dev is not None:
1896       # let's stay on the safe side and ask for the full size, for now
1897       return r_dev.Snapshot(disk.size)
1898     else:
1899       _Fail("Cannot find block device %s", disk)
1900   else:
1901     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1902           disk.unique_id, disk.dev_type)
1903
1904
1905 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1906   """Export a block device snapshot to a remote node.
1907
1908   @type disk: L{objects.Disk}
1909   @param disk: the description of the disk to export
1910   @type dest_node: str
1911   @param dest_node: the destination node to export to
1912   @type instance: L{objects.Instance}
1913   @param instance: the instance object to whom the disk belongs
1914   @type cluster_name: str
1915   @param cluster_name: the cluster name, needed for SSH hostalias
1916   @type idx: int
1917   @param idx: the index of the disk in the instance's disk list,
1918       used to export to the OS scripts environment
1919   @rtype: None
1920
1921   """
1922   inst_os = OSFromDisk(instance.os)
1923   export_env = OSEnvironment(instance, inst_os)
1924
1925   export_script = inst_os.export_script
1926
1927   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1928                                      instance.name, int(time.time()))
1929   if not os.path.exists(constants.LOG_OS_DIR):
1930     os.mkdir(constants.LOG_OS_DIR, 0750)
1931   real_disk = _RecursiveFindBD(disk)
1932   if real_disk is None:
1933     _Fail("Block device '%s' is not set up", disk)
1934
1935   real_disk.Open()
1936
1937   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1938   export_env['EXPORT_INDEX'] = str(idx)
1939
1940   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1941   destfile = disk.physical_id[1]
1942
1943   # the target command is built out of three individual commands,
1944   # which are joined by pipes; we check each individual command for
1945   # valid parameters
1946   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1947                                inst_os.path, export_script, logfile)
1948
1949   comprcmd = "gzip"
1950
1951   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1952                                 destdir, destdir, destfile)
1953   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1954                                                    constants.GANETI_RUNAS,
1955                                                    destcmd)
1956
1957   # all commands have been checked, so we're safe to combine them
1958   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1959
1960   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1961
1962   if result.failed:
1963     _Fail("OS snapshot export command '%s' returned error: %s"
1964           " output: %s", command, result.fail_reason, result.output)
1965
1966
1967 def FinalizeExport(instance, snap_disks):
1968   """Write out the export configuration information.
1969
1970   @type instance: L{objects.Instance}
1971   @param instance: the instance which we export, used for
1972       saving configuration
1973   @type snap_disks: list of L{objects.Disk}
1974   @param snap_disks: list of snapshot block devices, which
1975       will be used to get the actual name of the dump file
1976
1977   @rtype: None
1978
1979   """
1980   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1981   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1982
1983   config = objects.SerializableConfigParser()
1984
1985   config.add_section(constants.INISECT_EXP)
1986   config.set(constants.INISECT_EXP, 'version', '0')
1987   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1988   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1989   config.set(constants.INISECT_EXP, 'os', instance.os)
1990   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1991
1992   config.add_section(constants.INISECT_INS)
1993   config.set(constants.INISECT_INS, 'name', instance.name)
1994   config.set(constants.INISECT_INS, 'memory', '%d' %
1995              instance.beparams[constants.BE_MEMORY])
1996   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1997              instance.beparams[constants.BE_VCPUS])
1998   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1999
2000   nic_total = 0
2001   for nic_count, nic in enumerate(instance.nics):
2002     nic_total += 1
2003     config.set(constants.INISECT_INS, 'nic%d_mac' %
2004                nic_count, '%s' % nic.mac)
2005     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2006     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2007                '%s' % nic.bridge)
2008   # TODO: redundant: on load can read nics until it doesn't exist
2009   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2010
2011   disk_total = 0
2012   for disk_count, disk in enumerate(snap_disks):
2013     if disk:
2014       disk_total += 1
2015       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2016                  ('%s' % disk.iv_name))
2017       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2018                  ('%s' % disk.physical_id[1]))
2019       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2020                  ('%d' % disk.size))
2021
2022   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2023
2024   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2025                   data=config.Dumps())
2026   shutil.rmtree(finaldestdir, True)
2027   shutil.move(destdir, finaldestdir)
2028
2029
2030 def ExportInfo(dest):
2031   """Get export configuration information.
2032
2033   @type dest: str
2034   @param dest: directory containing the export
2035
2036   @rtype: L{objects.SerializableConfigParser}
2037   @return: a serializable config file containing the
2038       export info
2039
2040   """
2041   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2042
2043   config = objects.SerializableConfigParser()
2044   config.read(cff)
2045
2046   if (not config.has_section(constants.INISECT_EXP) or
2047       not config.has_section(constants.INISECT_INS)):
2048     _Fail("Export info file doesn't have the required fields")
2049
2050   return config.Dumps()
2051
2052
2053 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2054   """Import an os image into an instance.
2055
2056   @type instance: L{objects.Instance}
2057   @param instance: instance to import the disks into
2058   @type src_node: string
2059   @param src_node: source node for the disk images
2060   @type src_images: list of string
2061   @param src_images: absolute paths of the disk images
2062   @rtype: list of boolean
2063   @return: each boolean represent the success of importing the n-th disk
2064
2065   """
2066   inst_os = OSFromDisk(instance.os)
2067   import_env = OSEnvironment(instance, inst_os)
2068   import_script = inst_os.import_script
2069
2070   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2071                                         instance.name, int(time.time()))
2072   if not os.path.exists(constants.LOG_OS_DIR):
2073     os.mkdir(constants.LOG_OS_DIR, 0750)
2074
2075   comprcmd = "gunzip"
2076   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2077                                import_script, logfile)
2078
2079   final_result = []
2080   for idx, image in enumerate(src_images):
2081     if image:
2082       destcmd = utils.BuildShellCmd('cat %s', image)
2083       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2084                                                        constants.GANETI_RUNAS,
2085                                                        destcmd)
2086       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2087       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2088       import_env['IMPORT_INDEX'] = str(idx)
2089       result = utils.RunCmd(command, env=import_env)
2090       if result.failed:
2091         logging.error("Disk import command '%s' returned error: %s"
2092                       " output: %s", command, result.fail_reason,
2093                       result.output)
2094         final_result.append("error importing disk %d: %s, %s" %
2095                             (idx, result.fail_reason, result.output[-100]))
2096
2097   if final_result:
2098     _Fail("; ".join(final_result), log=False)
2099
2100
2101 def ListExports():
2102   """Return a list of exports currently available on this machine.
2103
2104   @rtype: list
2105   @return: list of the exports
2106
2107   """
2108   if os.path.isdir(constants.EXPORT_DIR):
2109     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2110   else:
2111     _Fail("No exports directory")
2112
2113
2114 def RemoveExport(export):
2115   """Remove an existing export from the node.
2116
2117   @type export: str
2118   @param export: the name of the export to remove
2119   @rtype: None
2120
2121   """
2122   target = os.path.join(constants.EXPORT_DIR, export)
2123
2124   try:
2125     shutil.rmtree(target)
2126   except EnvironmentError, err:
2127     _Fail("Error while removing the export: %s", err, exc=True)
2128
2129
2130 def BlockdevRename(devlist):
2131   """Rename a list of block devices.
2132
2133   @type devlist: list of tuples
2134   @param devlist: list of tuples of the form  (disk,
2135       new_logical_id, new_physical_id); disk is an
2136       L{objects.Disk} object describing the current disk,
2137       and new logical_id/physical_id is the name we
2138       rename it to
2139   @rtype: boolean
2140   @return: True if all renames succeeded, False otherwise
2141
2142   """
2143   msgs = []
2144   result = True
2145   for disk, unique_id in devlist:
2146     dev = _RecursiveFindBD(disk)
2147     if dev is None:
2148       msgs.append("Can't find device %s in rename" % str(disk))
2149       result = False
2150       continue
2151     try:
2152       old_rpath = dev.dev_path
2153       dev.Rename(unique_id)
2154       new_rpath = dev.dev_path
2155       if old_rpath != new_rpath:
2156         DevCacheManager.RemoveCache(old_rpath)
2157         # FIXME: we should add the new cache information here, like:
2158         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2159         # but we don't have the owner here - maybe parse from existing
2160         # cache? for now, we only lose lvm data when we rename, which
2161         # is less critical than DRBD or MD
2162     except errors.BlockDeviceError, err:
2163       msgs.append("Can't rename device '%s' to '%s': %s" %
2164                   (dev, unique_id, err))
2165       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2166       result = False
2167   if not result:
2168     _Fail("; ".join(msgs))
2169
2170
2171 def _TransformFileStorageDir(file_storage_dir):
2172   """Checks whether given file_storage_dir is valid.
2173
2174   Checks wheter the given file_storage_dir is within the cluster-wide
2175   default file_storage_dir stored in SimpleStore. Only paths under that
2176   directory are allowed.
2177
2178   @type file_storage_dir: str
2179   @param file_storage_dir: the path to check
2180
2181   @return: the normalized path if valid, None otherwise
2182
2183   """
2184   cfg = _GetConfig()
2185   file_storage_dir = os.path.normpath(file_storage_dir)
2186   base_file_storage_dir = cfg.GetFileStorageDir()
2187   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2188       base_file_storage_dir):
2189     _Fail("File storage directory '%s' is not under base file"
2190           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2191   return file_storage_dir
2192
2193
2194 def CreateFileStorageDir(file_storage_dir):
2195   """Create file storage directory.
2196
2197   @type file_storage_dir: str
2198   @param file_storage_dir: directory to create
2199
2200   @rtype: tuple
2201   @return: tuple with first element a boolean indicating wheter dir
2202       creation was successful or not
2203
2204   """
2205   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2206   if os.path.exists(file_storage_dir):
2207     if not os.path.isdir(file_storage_dir):
2208       _Fail("Specified storage dir '%s' is not a directory",
2209             file_storage_dir)
2210   else:
2211     try:
2212       os.makedirs(file_storage_dir, 0750)
2213     except OSError, err:
2214       _Fail("Cannot create file storage directory '%s': %s",
2215             file_storage_dir, err, exc=True)
2216
2217
2218 def RemoveFileStorageDir(file_storage_dir):
2219   """Remove file storage directory.
2220
2221   Remove it only if it's empty. If not log an error and return.
2222
2223   @type file_storage_dir: str
2224   @param file_storage_dir: the directory we should cleanup
2225   @rtype: tuple (success,)
2226   @return: tuple of one element, C{success}, denoting
2227       whether the operation was successful
2228
2229   """
2230   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2231   if os.path.exists(file_storage_dir):
2232     if not os.path.isdir(file_storage_dir):
2233       _Fail("Specified Storage directory '%s' is not a directory",
2234             file_storage_dir)
2235     # deletes dir only if empty, otherwise we want to fail the rpc call
2236     try:
2237       os.rmdir(file_storage_dir)
2238     except OSError, err:
2239       _Fail("Cannot remove file storage directory '%s': %s",
2240             file_storage_dir, err)
2241
2242
2243 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2244   """Rename the file storage directory.
2245
2246   @type old_file_storage_dir: str
2247   @param old_file_storage_dir: the current path
2248   @type new_file_storage_dir: str
2249   @param new_file_storage_dir: the name we should rename to
2250   @rtype: tuple (success,)
2251   @return: tuple of one element, C{success}, denoting
2252       whether the operation was successful
2253
2254   """
2255   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2256   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2257   if not os.path.exists(new_file_storage_dir):
2258     if os.path.isdir(old_file_storage_dir):
2259       try:
2260         os.rename(old_file_storage_dir, new_file_storage_dir)
2261       except OSError, err:
2262         _Fail("Cannot rename '%s' to '%s': %s",
2263               old_file_storage_dir, new_file_storage_dir, err)
2264     else:
2265       _Fail("Specified storage dir '%s' is not a directory",
2266             old_file_storage_dir)
2267   else:
2268     if os.path.exists(old_file_storage_dir):
2269       _Fail("Cannot rename '%s' to '%s': both locations exist",
2270             old_file_storage_dir, new_file_storage_dir)
2271
2272
2273 def _EnsureJobQueueFile(file_name):
2274   """Checks whether the given filename is in the queue directory.
2275
2276   @type file_name: str
2277   @param file_name: the file name we should check
2278   @rtype: None
2279   @raises RPCFail: if the file is not valid
2280
2281   """
2282   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2283   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2284
2285   if not result:
2286     _Fail("Passed job queue file '%s' does not belong to"
2287           " the queue directory '%s'", file_name, queue_dir)
2288
2289
2290 def JobQueueUpdate(file_name, content):
2291   """Updates a file in the queue directory.
2292
2293   This is just a wrapper over L{utils.WriteFile}, with proper
2294   checking.
2295
2296   @type file_name: str
2297   @param file_name: the job file name
2298   @type content: str
2299   @param content: the new job contents
2300   @rtype: boolean
2301   @return: the success of the operation
2302
2303   """
2304   _EnsureJobQueueFile(file_name)
2305
2306   # Write and replace the file atomically
2307   utils.WriteFile(file_name, data=_Decompress(content))
2308
2309
2310 def JobQueueRename(old, new):
2311   """Renames a job queue file.
2312
2313   This is just a wrapper over os.rename with proper checking.
2314
2315   @type old: str
2316   @param old: the old (actual) file name
2317   @type new: str
2318   @param new: the desired file name
2319   @rtype: tuple
2320   @return: the success of the operation and payload
2321
2322   """
2323   _EnsureJobQueueFile(old)
2324   _EnsureJobQueueFile(new)
2325
2326   utils.RenameFile(old, new, mkdir=True)
2327
2328
2329 def JobQueueSetDrainFlag(drain_flag):
2330   """Set the drain flag for the queue.
2331
2332   This will set or unset the queue drain flag.
2333
2334   @type drain_flag: boolean
2335   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2336   @rtype: truple
2337   @return: always True, None
2338   @warning: the function always returns True
2339
2340   """
2341   if drain_flag:
2342     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2343   else:
2344     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2345
2346
2347 def BlockdevClose(instance_name, disks):
2348   """Closes the given block devices.
2349
2350   This means they will be switched to secondary mode (in case of
2351   DRBD).
2352
2353   @param instance_name: if the argument is not empty, the symlinks
2354       of this instance will be removed
2355   @type disks: list of L{objects.Disk}
2356   @param disks: the list of disks to be closed
2357   @rtype: tuple (success, message)
2358   @return: a tuple of success and message, where success
2359       indicates the succes of the operation, and message
2360       which will contain the error details in case we
2361       failed
2362
2363   """
2364   bdevs = []
2365   for cf in disks:
2366     rd = _RecursiveFindBD(cf)
2367     if rd is None:
2368       _Fail("Can't find device %s", cf)
2369     bdevs.append(rd)
2370
2371   msg = []
2372   for rd in bdevs:
2373     try:
2374       rd.Close()
2375     except errors.BlockDeviceError, err:
2376       msg.append(str(err))
2377   if msg:
2378     _Fail("Can't make devices secondary: %s", ",".join(msg))
2379   else:
2380     if instance_name:
2381       _RemoveBlockDevLinks(instance_name, disks)
2382
2383
2384 def ValidateHVParams(hvname, hvparams):
2385   """Validates the given hypervisor parameters.
2386
2387   @type hvname: string
2388   @param hvname: the hypervisor name
2389   @type hvparams: dict
2390   @param hvparams: the hypervisor parameters to be validated
2391   @rtype: None
2392
2393   """
2394   try:
2395     hv_type = hypervisor.GetHypervisor(hvname)
2396     hv_type.ValidateParameters(hvparams)
2397   except errors.HypervisorError, err:
2398     _Fail(str(err), log=False)
2399
2400
2401 def DemoteFromMC():
2402   """Demotes the current node from master candidate role.
2403
2404   """
2405   # try to ensure we're not the master by mistake
2406   master, myself = ssconf.GetMasterAndMyself()
2407   if master == myself:
2408     _Fail("ssconf status shows I'm the master node, will not demote")
2409   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2410   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2411     _Fail("The master daemon is running, will not demote")
2412   try:
2413     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2414       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2415   except EnvironmentError, err:
2416     if err.errno != errno.ENOENT:
2417       _Fail("Error while backing up cluster file: %s", err, exc=True)
2418   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2419
2420
2421 def _FindDisks(nodes_ip, disks):
2422   """Sets the physical ID on disks and returns the block devices.
2423
2424   """
2425   # set the correct physical ID
2426   my_name = utils.HostInfo().name
2427   for cf in disks:
2428     cf.SetPhysicalID(my_name, nodes_ip)
2429
2430   bdevs = []
2431
2432   for cf in disks:
2433     rd = _RecursiveFindBD(cf)
2434     if rd is None:
2435       _Fail("Can't find device %s", cf)
2436     bdevs.append(rd)
2437   return bdevs
2438
2439
2440 def DrbdDisconnectNet(nodes_ip, disks):
2441   """Disconnects the network on a list of drbd devices.
2442
2443   """
2444   bdevs = _FindDisks(nodes_ip, disks)
2445
2446   # disconnect disks
2447   for rd in bdevs:
2448     try:
2449       rd.DisconnectNet()
2450     except errors.BlockDeviceError, err:
2451       _Fail("Can't change network configuration to standalone mode: %s",
2452             err, exc=True)
2453
2454
2455 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2456   """Attaches the network on a list of drbd devices.
2457
2458   """
2459   bdevs = _FindDisks(nodes_ip, disks)
2460
2461   if multimaster:
2462     for idx, rd in enumerate(bdevs):
2463       try:
2464         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2465       except EnvironmentError, err:
2466         _Fail("Can't create symlink: %s", err)
2467   # reconnect disks, switch to new master configuration and if
2468   # needed primary mode
2469   for rd in bdevs:
2470     try:
2471       rd.AttachNet(multimaster)
2472     except errors.BlockDeviceError, err:
2473       _Fail("Can't change network configuration: %s", err)
2474   # wait until the disks are connected; we need to retry the re-attach
2475   # if the device becomes standalone, as this might happen if the one
2476   # node disconnects and reconnects in a different mode before the
2477   # other node reconnects; in this case, one or both of the nodes will
2478   # decide it has wrong configuration and switch to standalone
2479   RECONNECT_TIMEOUT = 2 * 60
2480   sleep_time = 0.100 # start with 100 miliseconds
2481   timeout_limit = time.time() + RECONNECT_TIMEOUT
2482   while time.time() < timeout_limit:
2483     all_connected = True
2484     for rd in bdevs:
2485       stats = rd.GetProcStatus()
2486       if not (stats.is_connected or stats.is_in_resync):
2487         all_connected = False
2488       if stats.is_standalone:
2489         # peer had different config info and this node became
2490         # standalone, even though this should not happen with the
2491         # new staged way of changing disk configs
2492         try:
2493           rd.AttachNet(multimaster)
2494         except errors.BlockDeviceError, err:
2495           _Fail("Can't change network configuration: %s", err)
2496     if all_connected:
2497       break
2498     time.sleep(sleep_time)
2499     sleep_time = min(5, sleep_time * 1.5)
2500   if not all_connected:
2501     _Fail("Timeout in disk reconnecting")
2502   if multimaster:
2503     # change to primary mode
2504     for rd in bdevs:
2505       try:
2506         rd.Open()
2507       except errors.BlockDeviceError, err:
2508         _Fail("Can't change to primary mode: %s", err)
2509
2510
2511 def DrbdWaitSync(nodes_ip, disks):
2512   """Wait until DRBDs have synchronized.
2513
2514   """
2515   bdevs = _FindDisks(nodes_ip, disks)
2516
2517   min_resync = 100
2518   alldone = True
2519   for rd in bdevs:
2520     stats = rd.GetProcStatus()
2521     if not (stats.is_connected or stats.is_in_resync):
2522       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2523     alldone = alldone and (not stats.is_in_resync)
2524     if stats.sync_percent is not None:
2525       min_resync = min(min_resync, stats.sync_percent)
2526
2527   return (alldone, min_resync)
2528
2529
2530 def PowercycleNode(hypervisor_type):
2531   """Hard-powercycle the node.
2532
2533   Because we need to return first, and schedule the powercycle in the
2534   background, we won't be able to report failures nicely.
2535
2536   """
2537   hyper = hypervisor.GetHypervisor(hypervisor_type)
2538   try:
2539     pid = os.fork()
2540   except OSError:
2541     # if we can't fork, we'll pretend that we're in the child process
2542     pid = 0
2543   if pid > 0:
2544     return "Reboot scheduled in 5 seconds"
2545   time.sleep(5)
2546   hyper.PowercycleNode()
2547
2548
2549 class HooksRunner(object):
2550   """Hook runner.
2551
2552   This class is instantiated on the node side (ganeti-noded) and not
2553   on the master side.
2554
2555   """
2556   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2557
2558   def __init__(self, hooks_base_dir=None):
2559     """Constructor for hooks runner.
2560
2561     @type hooks_base_dir: str or None
2562     @param hooks_base_dir: if not None, this overrides the
2563         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2564
2565     """
2566     if hooks_base_dir is None:
2567       hooks_base_dir = constants.HOOKS_BASE_DIR
2568     self._BASE_DIR = hooks_base_dir
2569
2570   @staticmethod
2571   def ExecHook(script, env):
2572     """Exec one hook script.
2573
2574     @type script: str
2575     @param script: the full path to the script
2576     @type env: dict
2577     @param env: the environment with which to exec the script
2578     @rtype: tuple (success, message)
2579     @return: a tuple of success and message, where success
2580         indicates the succes of the operation, and message
2581         which will contain the error details in case we
2582         failed
2583
2584     """
2585     # exec the process using subprocess and log the output
2586     fdstdin = None
2587     try:
2588       fdstdin = open("/dev/null", "r")
2589       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2590                                stderr=subprocess.STDOUT, close_fds=True,
2591                                shell=False, cwd="/", env=env)
2592       output = ""
2593       try:
2594         output = child.stdout.read(4096)
2595         child.stdout.close()
2596       except EnvironmentError, err:
2597         output += "Hook script error: %s" % str(err)
2598
2599       while True:
2600         try:
2601           result = child.wait()
2602           break
2603         except EnvironmentError, err:
2604           if err.errno == errno.EINTR:
2605             continue
2606           raise
2607     finally:
2608       # try not to leak fds
2609       for fd in (fdstdin, ):
2610         if fd is not None:
2611           try:
2612             fd.close()
2613           except EnvironmentError, err:
2614             # just log the error
2615             #logging.exception("Error while closing fd %s", fd)
2616             pass
2617
2618     return result == 0, utils.SafeEncode(output.strip())
2619
2620   def RunHooks(self, hpath, phase, env):
2621     """Run the scripts in the hooks directory.
2622
2623     @type hpath: str
2624     @param hpath: the path to the hooks directory which
2625         holds the scripts
2626     @type phase: str
2627     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2628         L{constants.HOOKS_PHASE_POST}
2629     @type env: dict
2630     @param env: dictionary with the environment for the hook
2631     @rtype: list
2632     @return: list of 3-element tuples:
2633       - script path
2634       - script result, either L{constants.HKR_SUCCESS} or
2635         L{constants.HKR_FAIL}
2636       - output of the script
2637
2638     @raise errors.ProgrammerError: for invalid input
2639         parameters
2640
2641     """
2642     if phase == constants.HOOKS_PHASE_PRE:
2643       suffix = "pre"
2644     elif phase == constants.HOOKS_PHASE_POST:
2645       suffix = "post"
2646     else:
2647       _Fail("Unknown hooks phase '%s'", phase)
2648
2649     rr = []
2650
2651     subdir = "%s-%s.d" % (hpath, suffix)
2652     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2653     try:
2654       dir_contents = utils.ListVisibleFiles(dir_name)
2655     except OSError:
2656       # FIXME: must log output in case of failures
2657       return rr
2658
2659     # we use the standard python sort order,
2660     # so 00name is the recommended naming scheme
2661     dir_contents.sort()
2662     for relname in dir_contents:
2663       fname = os.path.join(dir_name, relname)
2664       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2665           self.RE_MASK.match(relname) is not None):
2666         rrval = constants.HKR_SKIP
2667         output = ""
2668       else:
2669         result, output = self.ExecHook(fname, env)
2670         if not result:
2671           rrval = constants.HKR_FAIL
2672         else:
2673           rrval = constants.HKR_SUCCESS
2674       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2675
2676     return rr
2677
2678
2679 class IAllocatorRunner(object):
2680   """IAllocator runner.
2681
2682   This class is instantiated on the node side (ganeti-noded) and not on
2683   the master side.
2684
2685   """
2686   def Run(self, name, idata):
2687     """Run an iallocator script.
2688
2689     @type name: str
2690     @param name: the iallocator script name
2691     @type idata: str
2692     @param idata: the allocator input data
2693
2694     @rtype: tuple
2695     @return: two element tuple of:
2696        - status
2697        - either error message or stdout of allocator (for success)
2698
2699     """
2700     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2701                                   os.path.isfile)
2702     if alloc_script is None:
2703       _Fail("iallocator module '%s' not found in the search path", name)
2704
2705     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2706     try:
2707       os.write(fd, idata)
2708       os.close(fd)
2709       result = utils.RunCmd([alloc_script, fin_name])
2710       if result.failed:
2711         _Fail("iallocator module '%s' failed: %s, output '%s'",
2712               name, result.fail_reason, result.output)
2713     finally:
2714       os.unlink(fin_name)
2715
2716     return result.stdout
2717
2718
2719 class DevCacheManager(object):
2720   """Simple class for managing a cache of block device information.
2721
2722   """
2723   _DEV_PREFIX = "/dev/"
2724   _ROOT_DIR = constants.BDEV_CACHE_DIR
2725
2726   @classmethod
2727   def _ConvertPath(cls, dev_path):
2728     """Converts a /dev/name path to the cache file name.
2729
2730     This replaces slashes with underscores and strips the /dev
2731     prefix. It then returns the full path to the cache file.
2732
2733     @type dev_path: str
2734     @param dev_path: the C{/dev/} path name
2735     @rtype: str
2736     @return: the converted path name
2737
2738     """
2739     if dev_path.startswith(cls._DEV_PREFIX):
2740       dev_path = dev_path[len(cls._DEV_PREFIX):]
2741     dev_path = dev_path.replace("/", "_")
2742     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2743     return fpath
2744
2745   @classmethod
2746   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2747     """Updates the cache information for a given device.
2748
2749     @type dev_path: str
2750     @param dev_path: the pathname of the device
2751     @type owner: str
2752     @param owner: the owner (instance name) of the device
2753     @type on_primary: bool
2754     @param on_primary: whether this is the primary
2755         node nor not
2756     @type iv_name: str
2757     @param iv_name: the instance-visible name of the
2758         device, as in objects.Disk.iv_name
2759
2760     @rtype: None
2761
2762     """
2763     if dev_path is None:
2764       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2765       return
2766     fpath = cls._ConvertPath(dev_path)
2767     if on_primary:
2768       state = "primary"
2769     else:
2770       state = "secondary"
2771     if iv_name is None:
2772       iv_name = "not_visible"
2773     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2774     try:
2775       utils.WriteFile(fpath, data=fdata)
2776     except EnvironmentError, err:
2777       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2778
2779   @classmethod
2780   def RemoveCache(cls, dev_path):
2781     """Remove data for a dev_path.
2782
2783     This is just a wrapper over L{utils.RemoveFile} with a converted
2784     path name and logging.
2785
2786     @type dev_path: str
2787     @param dev_path: the pathname of the device
2788
2789     @rtype: None
2790
2791     """
2792     if dev_path is None:
2793       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2794       return
2795     fpath = cls._ConvertPath(dev_path)
2796     try:
2797       utils.RemoveFile(fpath)
2798     except EnvironmentError, err:
2799       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)