Change GetNodeDaemonPort to GetDaemonPort in utils
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 class RPCFail(Exception):
55   """Class denoting RPC failure.
56
57   Its argument is the error message.
58
59   """
60
61 def _Fail(msg, *args, **kwargs):
62   """Log an error and the raise an RPCFail exception.
63
64   This exception is then handled specially in the ganeti daemon and
65   turned into a 'failed' return type. As such, this function is a
66   useful shortcut for logging the error and returning it to the master
67   daemon.
68
69   @type msg: string
70   @param msg: the text of the exception
71   @raise RPCFail
72
73   """
74   if args:
75     msg = msg % args
76   if "log" not in kwargs or kwargs["log"]: # if we should log this error
77     if "exc" in kwargs and kwargs["exc"]:
78       logging.exception(msg)
79     else:
80       logging.error(msg)
81   raise RPCFail(msg)
82
83
84 def _GetConfig():
85   """Simple wrapper to return a SimpleStore.
86
87   @rtype: L{ssconf.SimpleStore}
88   @return: a SimpleStore instance
89
90   """
91   return ssconf.SimpleStore()
92
93
94 def _GetSshRunner(cluster_name):
95   """Simple wrapper to return an SshRunner.
96
97   @type cluster_name: str
98   @param cluster_name: the cluster name, which is needed
99       by the SshRunner constructor
100   @rtype: L{ssh.SshRunner}
101   @return: an SshRunner instance
102
103   """
104   return ssh.SshRunner(cluster_name)
105
106
107 def _Decompress(data):
108   """Unpacks data compressed by the RPC client.
109
110   @type data: list or tuple
111   @param data: Data sent by RPC client
112   @rtype: str
113   @return: Decompressed data
114
115   """
116   assert isinstance(data, (list, tuple))
117   assert len(data) == 2
118   (encoding, content) = data
119   if encoding == constants.RPC_ENCODING_NONE:
120     return content
121   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
122     return zlib.decompress(base64.b64decode(content))
123   else:
124     raise AssertionError("Unknown data encoding")
125
126
127 def _CleanDirectory(path, exclude=None):
128   """Removes all regular files in a directory.
129
130   @type path: str
131   @param path: the directory to clean
132   @type exclude: list
133   @param exclude: list of files to be excluded, defaults
134       to the empty list
135
136   """
137   if not os.path.isdir(path):
138     return
139   if exclude is None:
140     exclude = []
141   else:
142     # Normalize excluded paths
143     exclude = [os.path.normpath(i) for i in exclude]
144
145   for rel_name in utils.ListVisibleFiles(path):
146     full_name = os.path.normpath(os.path.join(path, rel_name))
147     if full_name in exclude:
148       continue
149     if os.path.isfile(full_name) and not os.path.islink(full_name):
150       utils.RemoveFile(full_name)
151
152
153 def _BuildUploadFileList():
154   """Build the list of allowed upload files.
155
156   This is abstracted so that it's built only once at module import time.
157
158   """
159   allowed_files = set([
160     constants.CLUSTER_CONF_FILE,
161     constants.ETC_HOSTS,
162     constants.SSH_KNOWN_HOSTS_FILE,
163     constants.VNC_PASSWORD_FILE,
164     constants.RAPI_CERT_FILE,
165     constants.RAPI_USERS_FILE,
166     constants.HMAC_CLUSTER_KEY,
167     ])
168
169   for hv_name in constants.HYPER_TYPES:
170     hv_class = hypervisor.GetHypervisorClass(hv_name)
171     allowed_files.update(hv_class.GetAncillaryFiles())
172
173   return frozenset(allowed_files)
174
175
176 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
177
178
179 def JobQueuePurge():
180   """Removes job queue files and archived jobs.
181
182   @rtype: tuple
183   @return: True, None
184
185   """
186   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
187   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
188
189
190 def GetMasterInfo():
191   """Returns master information.
192
193   This is an utility function to compute master information, either
194   for consumption here or from the node daemon.
195
196   @rtype: tuple
197   @return: master_netdev, master_ip, master_name
198   @raise RPCFail: in case of errors
199
200   """
201   try:
202     cfg = _GetConfig()
203     master_netdev = cfg.GetMasterNetdev()
204     master_ip = cfg.GetMasterIP()
205     master_node = cfg.GetMasterNode()
206   except errors.ConfigurationError, err:
207     _Fail("Cluster configuration incomplete: %s", err, exc=True)
208   return (master_netdev, master_ip, master_node)
209
210
211 def StartMaster(start_daemons, no_voting):
212   """Activate local node as master node.
213
214   The function will always try activate the IP address of the master
215   (unless someone else has it). It will also start the master daemons,
216   based on the start_daemons parameter.
217
218   @type start_daemons: boolean
219   @param start_daemons: whether to also start the master
220       daemons (ganeti-masterd and ganeti-rapi)
221   @type no_voting: boolean
222   @param no_voting: whether to start ganeti-masterd without a node vote
223       (if start_daemons is True), but still non-interactively
224   @rtype: None
225
226   """
227   # GetMasterInfo will raise an exception if not able to return data
228   master_netdev, master_ip, _ = GetMasterInfo()
229
230   err_msgs = []
231   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
232     if utils.OwnIpAddress(master_ip):
233       # we already have the ip:
234       logging.debug("Master IP already configured, doing nothing")
235     else:
236       msg = "Someone else has the master ip, not activating"
237       logging.error(msg)
238       err_msgs.append(msg)
239   else:
240     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
241                            "dev", master_netdev, "label",
242                            "%s:0" % master_netdev])
243     if result.failed:
244       msg = "Can't activate master IP: %s" % result.output
245       logging.error(msg)
246       err_msgs.append(msg)
247
248     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
249                            "-s", master_ip, master_ip])
250     # we'll ignore the exit code of arping
251
252   # and now start the master and rapi daemons
253   if start_daemons:
254     daemons_params = {
255         'ganeti-masterd': [],
256         'ganeti-rapi': [],
257         }
258     if no_voting:
259       daemons_params['ganeti-masterd'].append('--no-voting')
260       daemons_params['ganeti-masterd'].append('--yes-do-it')
261     for daemon in daemons_params:
262       cmd = [daemon]
263       cmd.extend(daemons_params[daemon])
264       result = utils.RunCmd(cmd)
265       if result.failed:
266         msg = "Can't start daemon %s: %s" % (daemon, result.output)
267         logging.error(msg)
268         err_msgs.append(msg)
269
270   if err_msgs:
271     _Fail("; ".join(err_msgs))
272
273
274 def StopMaster(stop_daemons):
275   """Deactivate this node as master.
276
277   The function will always try to deactivate the IP address of the
278   master. It will also stop the master daemons depending on the
279   stop_daemons parameter.
280
281   @type stop_daemons: boolean
282   @param stop_daemons: whether to also stop the master daemons
283       (ganeti-masterd and ganeti-rapi)
284   @rtype: None
285
286   """
287   # TODO: log and report back to the caller the error failures; we
288   # need to decide in which case we fail the RPC for this
289
290   # GetMasterInfo will raise an exception if not able to return data
291   master_netdev, master_ip, _ = GetMasterInfo()
292
293   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
294                          "dev", master_netdev])
295   if result.failed:
296     logging.error("Can't remove the master IP, error: %s", result.output)
297     # but otherwise ignore the failure
298
299   if stop_daemons:
300     # stop/kill the rapi and the master daemon
301     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
302       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
303
304
305 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
306   """Joins this node to the cluster.
307
308   This does the following:
309       - updates the hostkeys of the machine (rsa and dsa)
310       - adds the ssh private key to the user
311       - adds the ssh public key to the users' authorized_keys file
312
313   @type dsa: str
314   @param dsa: the DSA private key to write
315   @type dsapub: str
316   @param dsapub: the DSA public key to write
317   @type rsa: str
318   @param rsa: the RSA private key to write
319   @type rsapub: str
320   @param rsapub: the RSA public key to write
321   @type sshkey: str
322   @param sshkey: the SSH private key to write
323   @type sshpub: str
324   @param sshpub: the SSH public key to write
325   @rtype: boolean
326   @return: the success of the operation
327
328   """
329   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
330                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
331                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
332                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
333   for name, content, mode in sshd_keys:
334     utils.WriteFile(name, data=content, mode=mode)
335
336   try:
337     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
338                                                     mkdir=True)
339   except errors.OpExecError, err:
340     _Fail("Error while processing user ssh files: %s", err, exc=True)
341
342   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
343     utils.WriteFile(name, data=content, mode=0600)
344
345   utils.AddAuthorizedKey(auth_keys, sshpub)
346
347   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
348
349
350 def LeaveCluster():
351   """Cleans up and remove the current node.
352
353   This function cleans up and prepares the current node to be removed
354   from the cluster.
355
356   If processing is successful, then it raises an
357   L{errors.QuitGanetiException} which is used as a special case to
358   shutdown the node daemon.
359
360   """
361   _CleanDirectory(constants.DATA_DIR)
362   JobQueuePurge()
363
364   try:
365     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
366
367     f = open(pub_key, 'r')
368     try:
369       utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
370     finally:
371       f.close()
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   # Raise a custom exception (handled in ganeti-noded)
379   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
380
381
382 def GetNodeInfo(vgname, hypervisor_type):
383   """Gives back a hash with different information about the node.
384
385   @type vgname: C{string}
386   @param vgname: the name of the volume group to ask for disk space information
387   @type hypervisor_type: C{str}
388   @param hypervisor_type: the name of the hypervisor to ask for
389       memory information
390   @rtype: C{dict}
391   @return: dictionary with the following keys:
392       - vg_size is the size of the configured volume group in MiB
393       - vg_free is the free size of the volume group in MiB
394       - memory_dom0 is the memory allocated for domain0 in MiB
395       - memory_free is the currently available (free) ram in MiB
396       - memory_total is the total number of ram in MiB
397
398   """
399   outputarray = {}
400   vginfo = _GetVGInfo(vgname)
401   outputarray['vg_size'] = vginfo['vg_size']
402   outputarray['vg_free'] = vginfo['vg_free']
403
404   hyper = hypervisor.GetHypervisor(hypervisor_type)
405   hyp_info = hyper.GetNodeInfo()
406   if hyp_info is not None:
407     outputarray.update(hyp_info)
408
409   f = open("/proc/sys/kernel/random/boot_id", 'r')
410   try:
411     outputarray["bootid"] = f.read(128).rstrip("\n")
412   finally:
413     f.close()
414
415   return outputarray
416
417
418 def VerifyNode(what, cluster_name):
419   """Verify the status of the local node.
420
421   Based on the input L{what} parameter, various checks are done on the
422   local node.
423
424   If the I{filelist} key is present, this list of
425   files is checksummed and the file/checksum pairs are returned.
426
427   If the I{nodelist} key is present, we check that we have
428   connectivity via ssh with the target nodes (and check the hostname
429   report).
430
431   If the I{node-net-test} key is present, we check that we have
432   connectivity to the given nodes via both primary IP and, if
433   applicable, secondary IPs.
434
435   @type what: C{dict}
436   @param what: a dictionary of things to check:
437       - filelist: list of files for which to compute checksums
438       - nodelist: list of nodes we should check ssh communication with
439       - node-net-test: list of nodes we should check node daemon port
440         connectivity with
441       - hypervisor: list with hypervisors to run the verify for
442   @rtype: dict
443   @return: a dictionary with the same keys as the input dict, and
444       values representing the result of the checks
445
446   """
447   result = {}
448
449   if constants.NV_HYPERVISOR in what:
450     result[constants.NV_HYPERVISOR] = tmp = {}
451     for hv_name in what[constants.NV_HYPERVISOR]:
452       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
453
454   if constants.NV_FILELIST in what:
455     result[constants.NV_FILELIST] = utils.FingerprintFiles(
456       what[constants.NV_FILELIST])
457
458   if constants.NV_NODELIST in what:
459     result[constants.NV_NODELIST] = tmp = {}
460     random.shuffle(what[constants.NV_NODELIST])
461     for node in what[constants.NV_NODELIST]:
462       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
463       if not success:
464         tmp[node] = message
465
466   if constants.NV_NODENETTEST in what:
467     result[constants.NV_NODENETTEST] = tmp = {}
468     my_name = utils.HostInfo().name
469     my_pip = my_sip = None
470     for name, pip, sip in what[constants.NV_NODENETTEST]:
471       if name == my_name:
472         my_pip = pip
473         my_sip = sip
474         break
475     if not my_pip:
476       tmp[my_name] = ("Can't find my own primary/secondary IP"
477                       " in the node list")
478     else:
479       port = utils.GetDaemonPort(constants.NODED)
480       for name, pip, sip in what[constants.NV_NODENETTEST]:
481         fail = []
482         if not utils.TcpPing(pip, port, source=my_pip):
483           fail.append("primary")
484         if sip != pip:
485           if not utils.TcpPing(sip, port, source=my_sip):
486             fail.append("secondary")
487         if fail:
488           tmp[name] = ("failure using the %s interface(s)" %
489                        " and ".join(fail))
490
491   if constants.NV_LVLIST in what:
492     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
493
494   if constants.NV_INSTANCELIST in what:
495     result[constants.NV_INSTANCELIST] = GetInstanceList(
496       what[constants.NV_INSTANCELIST])
497
498   if constants.NV_VGLIST in what:
499     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
500
501   if constants.NV_VERSION in what:
502     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
503                                     constants.RELEASE_VERSION)
504
505   if constants.NV_HVINFO in what:
506     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
507     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
508
509   if constants.NV_DRBDLIST in what:
510     try:
511       used_minors = bdev.DRBD8.GetUsedDevs().keys()
512     except errors.BlockDeviceError, err:
513       logging.warning("Can't get used minors list", exc_info=True)
514       used_minors = str(err)
515     result[constants.NV_DRBDLIST] = used_minors
516
517   return result
518
519
520 def GetVolumeList(vg_name):
521   """Compute list of logical volumes and their size.
522
523   @type vg_name: str
524   @param vg_name: the volume group whose LVs we should list
525   @rtype: dict
526   @return:
527       dictionary of all partions (key) with value being a tuple of
528       their size (in MiB), inactive and online status::
529
530         {'test1': ('20.06', True, True)}
531
532       in case of errors, a string is returned with the error
533       details.
534
535   """
536   lvs = {}
537   sep = '|'
538   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
539                          "--separator=%s" % sep,
540                          "-olv_name,lv_size,lv_attr", vg_name])
541   if result.failed:
542     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
543
544   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
545   for line in result.stdout.splitlines():
546     line = line.strip()
547     match = valid_line_re.match(line)
548     if not match:
549       logging.error("Invalid line returned from lvs output: '%s'", line)
550       continue
551     name, size, attr = match.groups()
552     inactive = attr[4] == '-'
553     online = attr[5] == 'o'
554     lvs[name] = (size, inactive, online)
555
556   return lvs
557
558
559 def ListVolumeGroups():
560   """List the volume groups and their size.
561
562   @rtype: dict
563   @return: dictionary with keys volume name and values the
564       size of the volume
565
566   """
567   return utils.ListVolumeGroups()
568
569
570 def NodeVolumes():
571   """List all volumes on this node.
572
573   @rtype: list
574   @return:
575     A list of dictionaries, each having four keys:
576       - name: the logical volume name,
577       - size: the size of the logical volume
578       - dev: the physical device on which the LV lives
579       - vg: the volume group to which it belongs
580
581     In case of errors, we return an empty list and log the
582     error.
583
584     Note that since a logical volume can live on multiple physical
585     volumes, the resulting list might include a logical volume
586     multiple times.
587
588   """
589   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
590                          "--separator=|",
591                          "--options=lv_name,lv_size,devices,vg_name"])
592   if result.failed:
593     _Fail("Failed to list logical volumes, lvs output: %s",
594           result.output)
595
596   def parse_dev(dev):
597     if '(' in dev:
598       return dev.split('(')[0]
599     else:
600       return dev
601
602   def map_line(line):
603     return {
604       'name': line[0].strip(),
605       'size': line[1].strip(),
606       'dev': parse_dev(line[2].strip()),
607       'vg': line[3].strip(),
608     }
609
610   return [map_line(line.split('|')) for line in result.stdout.splitlines()
611           if line.count('|') >= 3]
612
613
614 def BridgesExist(bridges_list):
615   """Check if a list of bridges exist on the current node.
616
617   @rtype: boolean
618   @return: C{True} if all of them exist, C{False} otherwise
619
620   """
621   missing = []
622   for bridge in bridges_list:
623     if not utils.BridgeExists(bridge):
624       missing.append(bridge)
625
626   if missing:
627     _Fail("Missing bridges %s", ", ".join(missing))
628
629
630 def GetInstanceList(hypervisor_list):
631   """Provides a list of instances.
632
633   @type hypervisor_list: list
634   @param hypervisor_list: the list of hypervisors to query information
635
636   @rtype: list
637   @return: a list of all running instances on the current node
638     - instance1.example.com
639     - instance2.example.com
640
641   """
642   results = []
643   for hname in hypervisor_list:
644     try:
645       names = hypervisor.GetHypervisor(hname).ListInstances()
646       results.extend(names)
647     except errors.HypervisorError, err:
648       _Fail("Error enumerating instances (hypervisor %s): %s",
649             hname, err, exc=True)
650
651   return results
652
653
654 def GetInstanceInfo(instance, hname):
655   """Gives back the information about an instance as a dictionary.
656
657   @type instance: string
658   @param instance: the instance name
659   @type hname: string
660   @param hname: the hypervisor type of the instance
661
662   @rtype: dict
663   @return: dictionary with the following keys:
664       - memory: memory size of instance (int)
665       - state: xen state of instance (string)
666       - time: cpu time of instance (float)
667
668   """
669   output = {}
670
671   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
672   if iinfo is not None:
673     output['memory'] = iinfo[2]
674     output['state'] = iinfo[4]
675     output['time'] = iinfo[5]
676
677   return output
678
679
680 def GetInstanceMigratable(instance):
681   """Gives whether an instance can be migrated.
682
683   @type instance: L{objects.Instance}
684   @param instance: object representing the instance to be checked.
685
686   @rtype: tuple
687   @return: tuple of (result, description) where:
688       - result: whether the instance can be migrated or not
689       - description: a description of the issue, if relevant
690
691   """
692   hyper = hypervisor.GetHypervisor(instance.hypervisor)
693   iname = instance.name
694   if iname not in hyper.ListInstances():
695     _Fail("Instance %s is not running", iname)
696
697   for idx in range(len(instance.disks)):
698     link_name = _GetBlockDevSymlinkPath(iname, idx)
699     if not os.path.islink(link_name):
700       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
701
702
703 def GetAllInstancesInfo(hypervisor_list):
704   """Gather data about all instances.
705
706   This is the equivalent of L{GetInstanceInfo}, except that it
707   computes data for all instances at once, thus being faster if one
708   needs data about more than one instance.
709
710   @type hypervisor_list: list
711   @param hypervisor_list: list of hypervisors to query for instance data
712
713   @rtype: dict
714   @return: dictionary of instance: data, with data having the following keys:
715       - memory: memory size of instance (int)
716       - state: xen state of instance (string)
717       - time: cpu time of instance (float)
718       - vcpus: the number of vcpus
719
720   """
721   output = {}
722
723   for hname in hypervisor_list:
724     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
725     if iinfo:
726       for name, _, memory, vcpus, state, times in iinfo:
727         value = {
728           'memory': memory,
729           'vcpus': vcpus,
730           'state': state,
731           'time': times,
732           }
733         if name in output:
734           # we only check static parameters, like memory and vcpus,
735           # and not state and time which can change between the
736           # invocations of the different hypervisors
737           for key in 'memory', 'vcpus':
738             if value[key] != output[name][key]:
739               _Fail("Instance %s is running twice"
740                     " with different parameters", name)
741         output[name] = value
742
743   return output
744
745
746 def InstanceOsAdd(instance, reinstall):
747   """Add an OS to an instance.
748
749   @type instance: L{objects.Instance}
750   @param instance: Instance whose OS is to be installed
751   @type reinstall: boolean
752   @param reinstall: whether this is an instance reinstall
753   @rtype: None
754
755   """
756   inst_os = OSFromDisk(instance.os)
757
758   create_env = OSEnvironment(instance, inst_os)
759   if reinstall:
760     create_env['INSTANCE_REINSTALL'] = "1"
761
762   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
763                                      instance.name, int(time.time()))
764
765   result = utils.RunCmd([inst_os.create_script], env=create_env,
766                         cwd=inst_os.path, output=logfile,)
767   if result.failed:
768     logging.error("os create command '%s' returned error: %s, logfile: %s,"
769                   " output: %s", result.cmd, result.fail_reason, logfile,
770                   result.output)
771     lines = [utils.SafeEncode(val)
772              for val in utils.TailFile(logfile, lines=20)]
773     _Fail("OS create script failed (%s), last lines in the"
774           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
775
776
777 def RunRenameInstance(instance, old_name):
778   """Run the OS rename script for an instance.
779
780   @type instance: L{objects.Instance}
781   @param instance: Instance whose OS is to be installed
782   @type old_name: string
783   @param old_name: previous instance name
784   @rtype: boolean
785   @return: the success of the operation
786
787   """
788   inst_os = OSFromDisk(instance.os)
789
790   rename_env = OSEnvironment(instance, inst_os)
791   rename_env['OLD_INSTANCE_NAME'] = old_name
792
793   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
794                                            old_name,
795                                            instance.name, int(time.time()))
796
797   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
798                         cwd=inst_os.path, output=logfile)
799
800   if result.failed:
801     logging.error("os create command '%s' returned error: %s output: %s",
802                   result.cmd, result.fail_reason, result.output)
803     lines = [utils.SafeEncode(val)
804              for val in utils.TailFile(logfile, lines=20)]
805     _Fail("OS rename script failed (%s), last lines in the"
806           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
807
808
809 def _GetVGInfo(vg_name):
810   """Get information about the volume group.
811
812   @type vg_name: str
813   @param vg_name: the volume group which we query
814   @rtype: dict
815   @return:
816     A dictionary with the following keys:
817       - C{vg_size} is the total size of the volume group in MiB
818       - C{vg_free} is the free size of the volume group in MiB
819       - C{pv_count} are the number of physical disks in that VG
820
821     If an error occurs during gathering of data, we return the same dict
822     with keys all set to None.
823
824   """
825   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
826
827   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
828                          "--nosuffix", "--units=m", "--separator=:", vg_name])
829
830   if retval.failed:
831     logging.error("volume group %s not present", vg_name)
832     return retdic
833   valarr = retval.stdout.strip().rstrip(':').split(':')
834   if len(valarr) == 3:
835     try:
836       retdic = {
837         "vg_size": int(round(float(valarr[0]), 0)),
838         "vg_free": int(round(float(valarr[1]), 0)),
839         "pv_count": int(valarr[2]),
840         }
841     except ValueError, err:
842       logging.exception("Fail to parse vgs output: %s", err)
843   else:
844     logging.error("vgs output has the wrong number of fields (expected"
845                   " three): %s", str(valarr))
846   return retdic
847
848
849 def _GetBlockDevSymlinkPath(instance_name, idx):
850   return os.path.join(constants.DISK_LINKS_DIR,
851                       "%s:%d" % (instance_name, idx))
852
853
854 def _SymlinkBlockDev(instance_name, device_path, idx):
855   """Set up symlinks to a instance's block device.
856
857   This is an auxiliary function run when an instance is start (on the primary
858   node) or when an instance is migrated (on the target node).
859
860
861   @param instance_name: the name of the target instance
862   @param device_path: path of the physical block device, on the node
863   @param idx: the disk index
864   @return: absolute path to the disk's symlink
865
866   """
867   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
868   try:
869     os.symlink(device_path, link_name)
870   except OSError, err:
871     if err.errno == errno.EEXIST:
872       if (not os.path.islink(link_name) or
873           os.readlink(link_name) != device_path):
874         os.remove(link_name)
875         os.symlink(device_path, link_name)
876     else:
877       raise
878
879   return link_name
880
881
882 def _RemoveBlockDevLinks(instance_name, disks):
883   """Remove the block device symlinks belonging to the given instance.
884
885   """
886   for idx, _ in enumerate(disks):
887     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
888     if os.path.islink(link_name):
889       try:
890         os.remove(link_name)
891       except OSError:
892         logging.exception("Can't remove symlink '%s'", link_name)
893
894
895 def _GatherAndLinkBlockDevs(instance):
896   """Set up an instance's block device(s).
897
898   This is run on the primary node at instance startup. The block
899   devices must be already assembled.
900
901   @type instance: L{objects.Instance}
902   @param instance: the instance whose disks we shoul assemble
903   @rtype: list
904   @return: list of (disk_object, device_path)
905
906   """
907   block_devices = []
908   for idx, disk in enumerate(instance.disks):
909     device = _RecursiveFindBD(disk)
910     if device is None:
911       raise errors.BlockDeviceError("Block device '%s' is not set up." %
912                                     str(disk))
913     device.Open()
914     try:
915       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
916     except OSError, e:
917       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
918                                     e.strerror)
919
920     block_devices.append((disk, link_name))
921
922   return block_devices
923
924
925 def StartInstance(instance):
926   """Start an instance.
927
928   @type instance: L{objects.Instance}
929   @param instance: the instance object
930   @rtype: None
931
932   """
933   running_instances = GetInstanceList([instance.hypervisor])
934
935   if instance.name in running_instances:
936     logging.info("Instance %s already running, not starting", instance.name)
937     return
938
939   try:
940     block_devices = _GatherAndLinkBlockDevs(instance)
941     hyper = hypervisor.GetHypervisor(instance.hypervisor)
942     hyper.StartInstance(instance, block_devices)
943   except errors.BlockDeviceError, err:
944     _Fail("Block device error: %s", err, exc=True)
945   except errors.HypervisorError, err:
946     _RemoveBlockDevLinks(instance.name, instance.disks)
947     _Fail("Hypervisor error: %s", err, exc=True)
948
949
950 def InstanceShutdown(instance):
951   """Shut an instance down.
952
953   @note: this functions uses polling with a hardcoded timeout.
954
955   @type instance: L{objects.Instance}
956   @param instance: the instance object
957   @rtype: None
958
959   """
960   hv_name = instance.hypervisor
961   running_instances = GetInstanceList([hv_name])
962   iname = instance.name
963
964   if iname not in running_instances:
965     logging.info("Instance %s not running, doing nothing", iname)
966     return
967
968   hyper = hypervisor.GetHypervisor(hv_name)
969   try:
970     hyper.StopInstance(instance)
971   except errors.HypervisorError, err:
972     _Fail("Failed to stop instance %s: %s", iname, err)
973
974   # test every 10secs for 2min
975
976   time.sleep(1)
977   for _ in range(11):
978     if instance.name not in GetInstanceList([hv_name]):
979       break
980     time.sleep(10)
981   else:
982     # the shutdown did not succeed
983     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
984
985     try:
986       hyper.StopInstance(instance, force=True)
987     except errors.HypervisorError, err:
988       _Fail("Failed to force stop instance %s: %s", iname, err)
989
990     time.sleep(1)
991     if instance.name in GetInstanceList([hv_name]):
992       _Fail("Could not shutdown instance %s even by destroy", iname)
993
994   _RemoveBlockDevLinks(iname, instance.disks)
995
996
997 def InstanceReboot(instance, reboot_type):
998   """Reboot an instance.
999
1000   @type instance: L{objects.Instance}
1001   @param instance: the instance object to reboot
1002   @type reboot_type: str
1003   @param reboot_type: the type of reboot, one the following
1004     constants:
1005       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1006         instance OS, do not recreate the VM
1007       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1008         restart the VM (at the hypervisor level)
1009       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1010         not accepted here, since that mode is handled differently, in
1011         cmdlib, and translates into full stop and start of the
1012         instance (instead of a call_instance_reboot RPC)
1013   @rtype: None
1014
1015   """
1016   running_instances = GetInstanceList([instance.hypervisor])
1017
1018   if instance.name not in running_instances:
1019     _Fail("Cannot reboot instance %s that is not running", instance.name)
1020
1021   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1022   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1023     try:
1024       hyper.RebootInstance(instance)
1025     except errors.HypervisorError, err:
1026       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1027   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1028     try:
1029       InstanceShutdown(instance)
1030       return StartInstance(instance)
1031     except errors.HypervisorError, err:
1032       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1033   else:
1034     _Fail("Invalid reboot_type received: %s", reboot_type)
1035
1036
1037 def MigrationInfo(instance):
1038   """Gather information about an instance to be migrated.
1039
1040   @type instance: L{objects.Instance}
1041   @param instance: the instance definition
1042
1043   """
1044   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1045   try:
1046     info = hyper.MigrationInfo(instance)
1047   except errors.HypervisorError, err:
1048     _Fail("Failed to fetch migration information: %s", err, exc=True)
1049   return info
1050
1051
1052 def AcceptInstance(instance, info, target):
1053   """Prepare the node to accept an instance.
1054
1055   @type instance: L{objects.Instance}
1056   @param instance: the instance definition
1057   @type info: string/data (opaque)
1058   @param info: migration information, from the source node
1059   @type target: string
1060   @param target: target host (usually ip), on this node
1061
1062   """
1063   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1064   try:
1065     hyper.AcceptInstance(instance, info, target)
1066   except errors.HypervisorError, err:
1067     _Fail("Failed to accept instance: %s", err, exc=True)
1068
1069
1070 def FinalizeMigration(instance, info, success):
1071   """Finalize any preparation to accept an instance.
1072
1073   @type instance: L{objects.Instance}
1074   @param instance: the instance definition
1075   @type info: string/data (opaque)
1076   @param info: migration information, from the source node
1077   @type success: boolean
1078   @param success: whether the migration was a success or a failure
1079
1080   """
1081   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1082   try:
1083     hyper.FinalizeMigration(instance, info, success)
1084   except errors.HypervisorError, err:
1085     _Fail("Failed to finalize migration: %s", err, exc=True)
1086
1087
1088 def MigrateInstance(instance, target, live):
1089   """Migrates an instance to another node.
1090
1091   @type instance: L{objects.Instance}
1092   @param instance: the instance definition
1093   @type target: string
1094   @param target: the target node name
1095   @type live: boolean
1096   @param live: whether the migration should be done live or not (the
1097       interpretation of this parameter is left to the hypervisor)
1098   @rtype: tuple
1099   @return: a tuple of (success, msg) where:
1100       - succes is a boolean denoting the success/failure of the operation
1101       - msg is a string with details in case of failure
1102
1103   """
1104   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1105
1106   try:
1107     hyper.MigrateInstance(instance.name, target, live)
1108   except errors.HypervisorError, err:
1109     _Fail("Failed to migrate instance: %s", err, exc=True)
1110
1111
1112 def BlockdevCreate(disk, size, owner, on_primary, info):
1113   """Creates a block device for an instance.
1114
1115   @type disk: L{objects.Disk}
1116   @param disk: the object describing the disk we should create
1117   @type size: int
1118   @param size: the size of the physical underlying device, in MiB
1119   @type owner: str
1120   @param owner: the name of the instance for which disk is created,
1121       used for device cache data
1122   @type on_primary: boolean
1123   @param on_primary:  indicates if it is the primary node or not
1124   @type info: string
1125   @param info: string that will be sent to the physical device
1126       creation, used for example to set (LVM) tags on LVs
1127
1128   @return: the new unique_id of the device (this can sometime be
1129       computed only after creation), or None. On secondary nodes,
1130       it's not required to return anything.
1131
1132   """
1133   clist = []
1134   if disk.children:
1135     for child in disk.children:
1136       try:
1137         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1138       except errors.BlockDeviceError, err:
1139         _Fail("Can't assemble device %s: %s", child, err)
1140       if on_primary or disk.AssembleOnSecondary():
1141         # we need the children open in case the device itself has to
1142         # be assembled
1143         try:
1144           crdev.Open()
1145         except errors.BlockDeviceError, err:
1146           _Fail("Can't make child '%s' read-write: %s", child, err)
1147       clist.append(crdev)
1148
1149   try:
1150     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1151   except errors.BlockDeviceError, err:
1152     _Fail("Can't create block device: %s", err)
1153
1154   if on_primary or disk.AssembleOnSecondary():
1155     try:
1156       device.Assemble()
1157     except errors.BlockDeviceError, err:
1158       _Fail("Can't assemble device after creation, unusual event: %s", err)
1159     device.SetSyncSpeed(constants.SYNC_SPEED)
1160     if on_primary or disk.OpenOnSecondary():
1161       try:
1162         device.Open(force=True)
1163       except errors.BlockDeviceError, err:
1164         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1165     DevCacheManager.UpdateCache(device.dev_path, owner,
1166                                 on_primary, disk.iv_name)
1167
1168   device.SetInfo(info)
1169
1170   return device.unique_id
1171
1172
1173 def BlockdevRemove(disk):
1174   """Remove a block device.
1175
1176   @note: This is intended to be called recursively.
1177
1178   @type disk: L{objects.Disk}
1179   @param disk: the disk object we should remove
1180   @rtype: boolean
1181   @return: the success of the operation
1182
1183   """
1184   msgs = []
1185   try:
1186     rdev = _RecursiveFindBD(disk)
1187   except errors.BlockDeviceError, err:
1188     # probably can't attach
1189     logging.info("Can't attach to device %s in remove", disk)
1190     rdev = None
1191   if rdev is not None:
1192     r_path = rdev.dev_path
1193     try:
1194       rdev.Remove()
1195     except errors.BlockDeviceError, err:
1196       msgs.append(str(err))
1197     if not msgs:
1198       DevCacheManager.RemoveCache(r_path)
1199
1200   if disk.children:
1201     for child in disk.children:
1202       try:
1203         BlockdevRemove(child)
1204       except RPCFail, err:
1205         msgs.append(str(err))
1206
1207   if msgs:
1208     _Fail("; ".join(msgs))
1209
1210
1211 def _RecursiveAssembleBD(disk, owner, as_primary):
1212   """Activate a block device for an instance.
1213
1214   This is run on the primary and secondary nodes for an instance.
1215
1216   @note: this function is called recursively.
1217
1218   @type disk: L{objects.Disk}
1219   @param disk: the disk we try to assemble
1220   @type owner: str
1221   @param owner: the name of the instance which owns the disk
1222   @type as_primary: boolean
1223   @param as_primary: if we should make the block device
1224       read/write
1225
1226   @return: the assembled device or None (in case no device
1227       was assembled)
1228   @raise errors.BlockDeviceError: in case there is an error
1229       during the activation of the children or the device
1230       itself
1231
1232   """
1233   children = []
1234   if disk.children:
1235     mcn = disk.ChildrenNeeded()
1236     if mcn == -1:
1237       mcn = 0 # max number of Nones allowed
1238     else:
1239       mcn = len(disk.children) - mcn # max number of Nones
1240     for chld_disk in disk.children:
1241       try:
1242         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1243       except errors.BlockDeviceError, err:
1244         if children.count(None) >= mcn:
1245           raise
1246         cdev = None
1247         logging.error("Error in child activation (but continuing): %s",
1248                       str(err))
1249       children.append(cdev)
1250
1251   if as_primary or disk.AssembleOnSecondary():
1252     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1253     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1254     result = r_dev
1255     if as_primary or disk.OpenOnSecondary():
1256       r_dev.Open()
1257     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1258                                 as_primary, disk.iv_name)
1259
1260   else:
1261     result = True
1262   return result
1263
1264
1265 def BlockdevAssemble(disk, owner, as_primary):
1266   """Activate a block device for an instance.
1267
1268   This is a wrapper over _RecursiveAssembleBD.
1269
1270   @rtype: str or boolean
1271   @return: a C{/dev/...} path for primary nodes, and
1272       C{True} for secondary nodes
1273
1274   """
1275   try:
1276     result = _RecursiveAssembleBD(disk, owner, as_primary)
1277     if isinstance(result, bdev.BlockDev):
1278       result = result.dev_path
1279   except errors.BlockDeviceError, err:
1280     _Fail("Error while assembling disk: %s", err, exc=True)
1281
1282   return result
1283
1284
1285 def BlockdevShutdown(disk):
1286   """Shut down a block device.
1287
1288   First, if the device is assembled (Attach() is successful), then
1289   the device is shutdown. Then the children of the device are
1290   shutdown.
1291
1292   This function is called recursively. Note that we don't cache the
1293   children or such, as oppossed to assemble, shutdown of different
1294   devices doesn't require that the upper device was active.
1295
1296   @type disk: L{objects.Disk}
1297   @param disk: the description of the disk we should
1298       shutdown
1299   @rtype: None
1300
1301   """
1302   msgs = []
1303   r_dev = _RecursiveFindBD(disk)
1304   if r_dev is not None:
1305     r_path = r_dev.dev_path
1306     try:
1307       r_dev.Shutdown()
1308       DevCacheManager.RemoveCache(r_path)
1309     except errors.BlockDeviceError, err:
1310       msgs.append(str(err))
1311
1312   if disk.children:
1313     for child in disk.children:
1314       try:
1315         BlockdevShutdown(child)
1316       except RPCFail, err:
1317         msgs.append(str(err))
1318
1319   if msgs:
1320     _Fail("; ".join(msgs))
1321
1322
1323 def BlockdevAddchildren(parent_cdev, new_cdevs):
1324   """Extend a mirrored block device.
1325
1326   @type parent_cdev: L{objects.Disk}
1327   @param parent_cdev: the disk to which we should add children
1328   @type new_cdevs: list of L{objects.Disk}
1329   @param new_cdevs: the list of children which we should add
1330   @rtype: None
1331
1332   """
1333   parent_bdev = _RecursiveFindBD(parent_cdev)
1334   if parent_bdev is None:
1335     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1336   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1337   if new_bdevs.count(None) > 0:
1338     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1339   parent_bdev.AddChildren(new_bdevs)
1340
1341
1342 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1343   """Shrink a mirrored block device.
1344
1345   @type parent_cdev: L{objects.Disk}
1346   @param parent_cdev: the disk from which we should remove children
1347   @type new_cdevs: list of L{objects.Disk}
1348   @param new_cdevs: the list of children which we should remove
1349   @rtype: None
1350
1351   """
1352   parent_bdev = _RecursiveFindBD(parent_cdev)
1353   if parent_bdev is None:
1354     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1355   devs = []
1356   for disk in new_cdevs:
1357     rpath = disk.StaticDevPath()
1358     if rpath is None:
1359       bd = _RecursiveFindBD(disk)
1360       if bd is None:
1361         _Fail("Can't find device %s while removing children", disk)
1362       else:
1363         devs.append(bd.dev_path)
1364     else:
1365       devs.append(rpath)
1366   parent_bdev.RemoveChildren(devs)
1367
1368
1369 def BlockdevGetmirrorstatus(disks):
1370   """Get the mirroring status of a list of devices.
1371
1372   @type disks: list of L{objects.Disk}
1373   @param disks: the list of disks which we should query
1374   @rtype: disk
1375   @return:
1376       a list of (mirror_done, estimated_time) tuples, which
1377       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1378   @raise errors.BlockDeviceError: if any of the disks cannot be
1379       found
1380
1381   """
1382   stats = []
1383   for dsk in disks:
1384     rbd = _RecursiveFindBD(dsk)
1385     if rbd is None:
1386       _Fail("Can't find device %s", dsk)
1387     stats.append(rbd.CombinedSyncStatus())
1388   return stats
1389
1390
1391 def _RecursiveFindBD(disk):
1392   """Check if a device is activated.
1393
1394   If so, return information about the real device.
1395
1396   @type disk: L{objects.Disk}
1397   @param disk: the disk object we need to find
1398
1399   @return: None if the device can't be found,
1400       otherwise the device instance
1401
1402   """
1403   children = []
1404   if disk.children:
1405     for chdisk in disk.children:
1406       children.append(_RecursiveFindBD(chdisk))
1407
1408   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1409
1410
1411 def BlockdevFind(disk):
1412   """Check if a device is activated.
1413
1414   If it is, return information about the real device.
1415
1416   @type disk: L{objects.Disk}
1417   @param disk: the disk to find
1418   @rtype: None or tuple
1419   @return: None if the disk cannot be found, otherwise a
1420       tuple (device_path, major, minor, sync_percent,
1421       estimated_time, is_degraded)
1422
1423   """
1424   try:
1425     rbd = _RecursiveFindBD(disk)
1426   except errors.BlockDeviceError, err:
1427     _Fail("Failed to find device: %s", err, exc=True)
1428   if rbd is None:
1429     return None
1430   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1431
1432
1433 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1434   """Write a file to the filesystem.
1435
1436   This allows the master to overwrite(!) a file. It will only perform
1437   the operation if the file belongs to a list of configuration files.
1438
1439   @type file_name: str
1440   @param file_name: the target file name
1441   @type data: str
1442   @param data: the new contents of the file
1443   @type mode: int
1444   @param mode: the mode to give the file (can be None)
1445   @type uid: int
1446   @param uid: the owner of the file (can be -1 for default)
1447   @type gid: int
1448   @param gid: the group of the file (can be -1 for default)
1449   @type atime: float
1450   @param atime: the atime to set on the file (can be None)
1451   @type mtime: float
1452   @param mtime: the mtime to set on the file (can be None)
1453   @rtype: None
1454
1455   """
1456   if not os.path.isabs(file_name):
1457     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1458
1459   if file_name not in _ALLOWED_UPLOAD_FILES:
1460     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1461           file_name)
1462
1463   raw_data = _Decompress(data)
1464
1465   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1466                   atime=atime, mtime=mtime)
1467
1468
1469 def WriteSsconfFiles(values):
1470   """Update all ssconf files.
1471
1472   Wrapper around the SimpleStore.WriteFiles.
1473
1474   """
1475   ssconf.SimpleStore().WriteFiles(values)
1476
1477
1478 def _ErrnoOrStr(err):
1479   """Format an EnvironmentError exception.
1480
1481   If the L{err} argument has an errno attribute, it will be looked up
1482   and converted into a textual C{E...} description. Otherwise the
1483   string representation of the error will be returned.
1484
1485   @type err: L{EnvironmentError}
1486   @param err: the exception to format
1487
1488   """
1489   if hasattr(err, 'errno'):
1490     detail = errno.errorcode[err.errno]
1491   else:
1492     detail = str(err)
1493   return detail
1494
1495
1496 def _OSOndiskAPIVersion(name, os_dir):
1497   """Compute and return the API version of a given OS.
1498
1499   This function will try to read the API version of the OS given by
1500   the 'name' parameter and residing in the 'os_dir' directory.
1501
1502   @type name: str
1503   @param name: the OS name we should look for
1504   @type os_dir: str
1505   @param os_dir: the directory inwhich we should look for the OS
1506   @rtype: tuple
1507   @return: tuple (status, data) with status denoting the validity and
1508       data holding either the vaid versions or an error message
1509
1510   """
1511   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1512
1513   try:
1514     st = os.stat(api_file)
1515   except EnvironmentError, err:
1516     return False, ("Required file 'ganeti_api_version' file not"
1517                    " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1518
1519   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1520     return False, ("File 'ganeti_api_version' file at %s is not"
1521                    " a regular file" % os_dir)
1522
1523   try:
1524     api_versions = utils.ReadFile(api_file).splitlines()
1525   except EnvironmentError, err:
1526     return False, ("Error while reading the API version file at %s: %s" %
1527                    (api_file, _ErrnoOrStr(err)))
1528
1529   try:
1530     api_versions = [int(version.strip()) for version in api_versions]
1531   except (TypeError, ValueError), err:
1532     return False, ("API version(s) can't be converted to integer: %s" %
1533                    str(err))
1534
1535   return True, api_versions
1536
1537
1538 def DiagnoseOS(top_dirs=None):
1539   """Compute the validity for all OSes.
1540
1541   @type top_dirs: list
1542   @param top_dirs: the list of directories in which to
1543       search (if not given defaults to
1544       L{constants.OS_SEARCH_PATH})
1545   @rtype: list of L{objects.OS}
1546   @return: a list of tuples (name, path, status, diagnose)
1547       for all (potential) OSes under all search paths, where:
1548           - name is the (potential) OS name
1549           - path is the full path to the OS
1550           - status True/False is the validity of the OS
1551           - diagnose is the error message for an invalid OS, otherwise empty
1552
1553   """
1554   if top_dirs is None:
1555     top_dirs = constants.OS_SEARCH_PATH
1556
1557   result = []
1558   for dir_name in top_dirs:
1559     if os.path.isdir(dir_name):
1560       try:
1561         f_names = utils.ListVisibleFiles(dir_name)
1562       except EnvironmentError, err:
1563         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1564         break
1565       for name in f_names:
1566         os_path = os.path.sep.join([dir_name, name])
1567         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1568         if status:
1569           diagnose = ""
1570         else:
1571           diagnose = os_inst
1572         result.append((name, os_path, status, diagnose))
1573
1574   return result
1575
1576
1577 def _TryOSFromDisk(name, base_dir=None):
1578   """Create an OS instance from disk.
1579
1580   This function will return an OS instance if the given name is a
1581   valid OS name.
1582
1583   @type base_dir: string
1584   @keyword base_dir: Base directory containing OS installations.
1585                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1586   @rtype: tuple
1587   @return: success and either the OS instance if we find a valid one,
1588       or error message
1589
1590   """
1591   if base_dir is None:
1592     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1593     if os_dir is None:
1594       return False, "Directory for OS %s not found in search path" % name
1595   else:
1596     os_dir = os.path.sep.join([base_dir, name])
1597
1598   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1599   if not status:
1600     # push the error up
1601     return status, api_versions
1602
1603   if not constants.OS_API_VERSIONS.intersection(api_versions):
1604     return False, ("API version mismatch for path '%s': found %s, want %s." %
1605                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1606
1607   # OS Scripts dictionary, we will populate it with the actual script names
1608   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1609
1610   for script in os_scripts:
1611     os_scripts[script] = os.path.sep.join([os_dir, script])
1612
1613     try:
1614       st = os.stat(os_scripts[script])
1615     except EnvironmentError, err:
1616       return False, ("Script '%s' under path '%s' is missing (%s)" %
1617                      (script, os_dir, _ErrnoOrStr(err)))
1618
1619     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1620       return False, ("Script '%s' under path '%s' is not executable" %
1621                      (script, os_dir))
1622
1623     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1624       return False, ("Script '%s' under path '%s' is not a regular file" %
1625                      (script, os_dir))
1626
1627   os_obj = objects.OS(name=name, path=os_dir,
1628                       create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1629                       export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1630                       import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1631                       rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1632                       api_versions=api_versions)
1633   return True, os_obj
1634
1635
1636 def OSFromDisk(name, base_dir=None):
1637   """Create an OS instance from disk.
1638
1639   This function will return an OS instance if the given name is a
1640   valid OS name. Otherwise, it will raise an appropriate
1641   L{RPCFail} exception, detailing why this is not a valid OS.
1642
1643   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1644   an exception but returns true/false status data.
1645
1646   @type base_dir: string
1647   @keyword base_dir: Base directory containing OS installations.
1648                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1649   @rtype: L{objects.OS}
1650   @return: the OS instance if we find a valid one
1651   @raise RPCFail: if we don't find a valid OS
1652
1653   """
1654   status, payload = _TryOSFromDisk(name, base_dir)
1655
1656   if not status:
1657     _Fail(payload)
1658
1659   return payload
1660
1661
1662 def OSEnvironment(instance, os, debug=0):
1663   """Calculate the environment for an os script.
1664
1665   @type instance: L{objects.Instance}
1666   @param instance: target instance for the os script run
1667   @type os: L{objects.OS}
1668   @param os: operating system for which the environment is being built
1669   @type debug: integer
1670   @param debug: debug level (0 or 1, for OS Api 10)
1671   @rtype: dict
1672   @return: dict of environment variables
1673   @raise errors.BlockDeviceError: if the block device
1674       cannot be found
1675
1676   """
1677   result = {}
1678   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1679   result['OS_API_VERSION'] = '%d' % api_version
1680   result['INSTANCE_NAME'] = instance.name
1681   result['INSTANCE_OS'] = instance.os
1682   result['HYPERVISOR'] = instance.hypervisor
1683   result['DISK_COUNT'] = '%d' % len(instance.disks)
1684   result['NIC_COUNT'] = '%d' % len(instance.nics)
1685   result['DEBUG_LEVEL'] = '%d' % debug
1686   for idx, disk in enumerate(instance.disks):
1687     real_disk = _RecursiveFindBD(disk)
1688     if real_disk is None:
1689       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1690                                     str(disk))
1691     real_disk.Open()
1692     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1693     result['DISK_%d_ACCESS' % idx] = disk.mode
1694     if constants.HV_DISK_TYPE in instance.hvparams:
1695       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1696         instance.hvparams[constants.HV_DISK_TYPE]
1697     if disk.dev_type in constants.LDS_BLOCK:
1698       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1699     elif disk.dev_type == constants.LD_FILE:
1700       result['DISK_%d_BACKEND_TYPE' % idx] = \
1701         'file:%s' % disk.physical_id[0]
1702   for idx, nic in enumerate(instance.nics):
1703     result['NIC_%d_MAC' % idx] = nic.mac
1704     if nic.ip:
1705       result['NIC_%d_IP' % idx] = nic.ip
1706     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1707     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1708       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1709     if nic.nicparams[constants.NIC_LINK]:
1710       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1711     if constants.HV_NIC_TYPE in instance.hvparams:
1712       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1713         instance.hvparams[constants.HV_NIC_TYPE]
1714
1715   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1716     for key, value in source.items():
1717       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1718
1719   return result
1720
1721 def BlockdevGrow(disk, amount):
1722   """Grow a stack of block devices.
1723
1724   This function is called recursively, with the childrens being the
1725   first ones to resize.
1726
1727   @type disk: L{objects.Disk}
1728   @param disk: the disk to be grown
1729   @rtype: (status, result)
1730   @return: a tuple with the status of the operation
1731       (True/False), and the errors message if status
1732       is False
1733
1734   """
1735   r_dev = _RecursiveFindBD(disk)
1736   if r_dev is None:
1737     _Fail("Cannot find block device %s", disk)
1738
1739   try:
1740     r_dev.Grow(amount)
1741   except errors.BlockDeviceError, err:
1742     _Fail("Failed to grow block device: %s", err, exc=True)
1743
1744
1745 def BlockdevSnapshot(disk):
1746   """Create a snapshot copy of a block device.
1747
1748   This function is called recursively, and the snapshot is actually created
1749   just for the leaf lvm backend device.
1750
1751   @type disk: L{objects.Disk}
1752   @param disk: the disk to be snapshotted
1753   @rtype: string
1754   @return: snapshot disk path
1755
1756   """
1757   if disk.children:
1758     if len(disk.children) == 1:
1759       # only one child, let's recurse on it
1760       return BlockdevSnapshot(disk.children[0])
1761     else:
1762       # more than one child, choose one that matches
1763       for child in disk.children:
1764         if child.size == disk.size:
1765           # return implies breaking the loop
1766           return BlockdevSnapshot(child)
1767   elif disk.dev_type == constants.LD_LV:
1768     r_dev = _RecursiveFindBD(disk)
1769     if r_dev is not None:
1770       # let's stay on the safe side and ask for the full size, for now
1771       return r_dev.Snapshot(disk.size)
1772     else:
1773       _Fail("Cannot find block device %s", disk)
1774   else:
1775     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1776           disk.unique_id, disk.dev_type)
1777
1778
1779 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1780   """Export a block device snapshot to a remote node.
1781
1782   @type disk: L{objects.Disk}
1783   @param disk: the description of the disk to export
1784   @type dest_node: str
1785   @param dest_node: the destination node to export to
1786   @type instance: L{objects.Instance}
1787   @param instance: the instance object to whom the disk belongs
1788   @type cluster_name: str
1789   @param cluster_name: the cluster name, needed for SSH hostalias
1790   @type idx: int
1791   @param idx: the index of the disk in the instance's disk list,
1792       used to export to the OS scripts environment
1793   @rtype: None
1794
1795   """
1796   inst_os = OSFromDisk(instance.os)
1797   export_env = OSEnvironment(instance, inst_os)
1798
1799   export_script = inst_os.export_script
1800
1801   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1802                                      instance.name, int(time.time()))
1803   if not os.path.exists(constants.LOG_OS_DIR):
1804     os.mkdir(constants.LOG_OS_DIR, 0750)
1805   real_disk = _RecursiveFindBD(disk)
1806   if real_disk is None:
1807     _Fail("Block device '%s' is not set up", disk)
1808
1809   real_disk.Open()
1810
1811   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1812   export_env['EXPORT_INDEX'] = str(idx)
1813
1814   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1815   destfile = disk.physical_id[1]
1816
1817   # the target command is built out of three individual commands,
1818   # which are joined by pipes; we check each individual command for
1819   # valid parameters
1820   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1821                                export_script, logfile)
1822
1823   comprcmd = "gzip"
1824
1825   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1826                                 destdir, destdir, destfile)
1827   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1828                                                    constants.GANETI_RUNAS,
1829                                                    destcmd)
1830
1831   # all commands have been checked, so we're safe to combine them
1832   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1833
1834   result = utils.RunCmd(command, env=export_env)
1835
1836   if result.failed:
1837     _Fail("OS snapshot export command '%s' returned error: %s"
1838           " output: %s", command, result.fail_reason, result.output)
1839
1840
1841 def FinalizeExport(instance, snap_disks):
1842   """Write out the export configuration information.
1843
1844   @type instance: L{objects.Instance}
1845   @param instance: the instance which we export, used for
1846       saving configuration
1847   @type snap_disks: list of L{objects.Disk}
1848   @param snap_disks: list of snapshot block devices, which
1849       will be used to get the actual name of the dump file
1850
1851   @rtype: None
1852
1853   """
1854   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1855   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1856
1857   config = objects.SerializableConfigParser()
1858
1859   config.add_section(constants.INISECT_EXP)
1860   config.set(constants.INISECT_EXP, 'version', '0')
1861   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1862   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1863   config.set(constants.INISECT_EXP, 'os', instance.os)
1864   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1865
1866   config.add_section(constants.INISECT_INS)
1867   config.set(constants.INISECT_INS, 'name', instance.name)
1868   config.set(constants.INISECT_INS, 'memory', '%d' %
1869              instance.beparams[constants.BE_MEMORY])
1870   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1871              instance.beparams[constants.BE_VCPUS])
1872   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1873
1874   nic_total = 0
1875   for nic_count, nic in enumerate(instance.nics):
1876     nic_total += 1
1877     config.set(constants.INISECT_INS, 'nic%d_mac' %
1878                nic_count, '%s' % nic.mac)
1879     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1880     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1881                '%s' % nic.bridge)
1882   # TODO: redundant: on load can read nics until it doesn't exist
1883   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1884
1885   disk_total = 0
1886   for disk_count, disk in enumerate(snap_disks):
1887     if disk:
1888       disk_total += 1
1889       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1890                  ('%s' % disk.iv_name))
1891       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1892                  ('%s' % disk.physical_id[1]))
1893       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1894                  ('%d' % disk.size))
1895
1896   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1897
1898   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1899                   data=config.Dumps())
1900   shutil.rmtree(finaldestdir, True)
1901   shutil.move(destdir, finaldestdir)
1902
1903
1904 def ExportInfo(dest):
1905   """Get export configuration information.
1906
1907   @type dest: str
1908   @param dest: directory containing the export
1909
1910   @rtype: L{objects.SerializableConfigParser}
1911   @return: a serializable config file containing the
1912       export info
1913
1914   """
1915   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1916
1917   config = objects.SerializableConfigParser()
1918   config.read(cff)
1919
1920   if (not config.has_section(constants.INISECT_EXP) or
1921       not config.has_section(constants.INISECT_INS)):
1922     _Fail("Export info file doesn't have the required fields")
1923
1924   return config.Dumps()
1925
1926
1927 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1928   """Import an os image into an instance.
1929
1930   @type instance: L{objects.Instance}
1931   @param instance: instance to import the disks into
1932   @type src_node: string
1933   @param src_node: source node for the disk images
1934   @type src_images: list of string
1935   @param src_images: absolute paths of the disk images
1936   @rtype: list of boolean
1937   @return: each boolean represent the success of importing the n-th disk
1938
1939   """
1940   inst_os = OSFromDisk(instance.os)
1941   import_env = OSEnvironment(instance, inst_os)
1942   import_script = inst_os.import_script
1943
1944   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1945                                         instance.name, int(time.time()))
1946   if not os.path.exists(constants.LOG_OS_DIR):
1947     os.mkdir(constants.LOG_OS_DIR, 0750)
1948
1949   comprcmd = "gunzip"
1950   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1951                                import_script, logfile)
1952
1953   final_result = []
1954   for idx, image in enumerate(src_images):
1955     if image:
1956       destcmd = utils.BuildShellCmd('cat %s', image)
1957       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1958                                                        constants.GANETI_RUNAS,
1959                                                        destcmd)
1960       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1961       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1962       import_env['IMPORT_INDEX'] = str(idx)
1963       result = utils.RunCmd(command, env=import_env)
1964       if result.failed:
1965         logging.error("Disk import command '%s' returned error: %s"
1966                       " output: %s", command, result.fail_reason,
1967                       result.output)
1968         final_result.append("error importing disk %d: %s, %s" %
1969                             (idx, result.fail_reason, result.output[-100]))
1970
1971   if final_result:
1972     _Fail("; ".join(final_result), log=False)
1973
1974
1975 def ListExports():
1976   """Return a list of exports currently available on this machine.
1977
1978   @rtype: list
1979   @return: list of the exports
1980
1981   """
1982   if os.path.isdir(constants.EXPORT_DIR):
1983     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1984   else:
1985     _Fail("No exports directory")
1986
1987
1988 def RemoveExport(export):
1989   """Remove an existing export from the node.
1990
1991   @type export: str
1992   @param export: the name of the export to remove
1993   @rtype: None
1994
1995   """
1996   target = os.path.join(constants.EXPORT_DIR, export)
1997
1998   try:
1999     shutil.rmtree(target)
2000   except EnvironmentError, err:
2001     _Fail("Error while removing the export: %s", err, exc=True)
2002
2003
2004 def BlockdevRename(devlist):
2005   """Rename a list of block devices.
2006
2007   @type devlist: list of tuples
2008   @param devlist: list of tuples of the form  (disk,
2009       new_logical_id, new_physical_id); disk is an
2010       L{objects.Disk} object describing the current disk,
2011       and new logical_id/physical_id is the name we
2012       rename it to
2013   @rtype: boolean
2014   @return: True if all renames succeeded, False otherwise
2015
2016   """
2017   msgs = []
2018   result = True
2019   for disk, unique_id in devlist:
2020     dev = _RecursiveFindBD(disk)
2021     if dev is None:
2022       msgs.append("Can't find device %s in rename" % str(disk))
2023       result = False
2024       continue
2025     try:
2026       old_rpath = dev.dev_path
2027       dev.Rename(unique_id)
2028       new_rpath = dev.dev_path
2029       if old_rpath != new_rpath:
2030         DevCacheManager.RemoveCache(old_rpath)
2031         # FIXME: we should add the new cache information here, like:
2032         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2033         # but we don't have the owner here - maybe parse from existing
2034         # cache? for now, we only lose lvm data when we rename, which
2035         # is less critical than DRBD or MD
2036     except errors.BlockDeviceError, err:
2037       msgs.append("Can't rename device '%s' to '%s': %s" %
2038                   (dev, unique_id, err))
2039       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2040       result = False
2041   if not result:
2042     _Fail("; ".join(msgs))
2043
2044
2045 def _TransformFileStorageDir(file_storage_dir):
2046   """Checks whether given file_storage_dir is valid.
2047
2048   Checks wheter the given file_storage_dir is within the cluster-wide
2049   default file_storage_dir stored in SimpleStore. Only paths under that
2050   directory are allowed.
2051
2052   @type file_storage_dir: str
2053   @param file_storage_dir: the path to check
2054
2055   @return: the normalized path if valid, None otherwise
2056
2057   """
2058   cfg = _GetConfig()
2059   file_storage_dir = os.path.normpath(file_storage_dir)
2060   base_file_storage_dir = cfg.GetFileStorageDir()
2061   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2062       base_file_storage_dir):
2063     _Fail("File storage directory '%s' is not under base file"
2064           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2065   return file_storage_dir
2066
2067
2068 def CreateFileStorageDir(file_storage_dir):
2069   """Create file storage directory.
2070
2071   @type file_storage_dir: str
2072   @param file_storage_dir: directory to create
2073
2074   @rtype: tuple
2075   @return: tuple with first element a boolean indicating wheter dir
2076       creation was successful or not
2077
2078   """
2079   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2080   if os.path.exists(file_storage_dir):
2081     if not os.path.isdir(file_storage_dir):
2082       _Fail("Specified storage dir '%s' is not a directory",
2083             file_storage_dir)
2084   else:
2085     try:
2086       os.makedirs(file_storage_dir, 0750)
2087     except OSError, err:
2088       _Fail("Cannot create file storage directory '%s': %s",
2089             file_storage_dir, err, exc=True)
2090
2091
2092 def RemoveFileStorageDir(file_storage_dir):
2093   """Remove file storage directory.
2094
2095   Remove it only if it's empty. If not log an error and return.
2096
2097   @type file_storage_dir: str
2098   @param file_storage_dir: the directory we should cleanup
2099   @rtype: tuple (success,)
2100   @return: tuple of one element, C{success}, denoting
2101       whether the operation was successful
2102
2103   """
2104   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2105   if os.path.exists(file_storage_dir):
2106     if not os.path.isdir(file_storage_dir):
2107       _Fail("Specified Storage directory '%s' is not a directory",
2108             file_storage_dir)
2109     # deletes dir only if empty, otherwise we want to fail the rpc call
2110     try:
2111       os.rmdir(file_storage_dir)
2112     except OSError, err:
2113       _Fail("Cannot remove file storage directory '%s': %s",
2114             file_storage_dir, err)
2115
2116
2117 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2118   """Rename the file storage directory.
2119
2120   @type old_file_storage_dir: str
2121   @param old_file_storage_dir: the current path
2122   @type new_file_storage_dir: str
2123   @param new_file_storage_dir: the name we should rename to
2124   @rtype: tuple (success,)
2125   @return: tuple of one element, C{success}, denoting
2126       whether the operation was successful
2127
2128   """
2129   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2130   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2131   if not os.path.exists(new_file_storage_dir):
2132     if os.path.isdir(old_file_storage_dir):
2133       try:
2134         os.rename(old_file_storage_dir, new_file_storage_dir)
2135       except OSError, err:
2136         _Fail("Cannot rename '%s' to '%s': %s",
2137               old_file_storage_dir, new_file_storage_dir, err)
2138     else:
2139       _Fail("Specified storage dir '%s' is not a directory",
2140             old_file_storage_dir)
2141   else:
2142     if os.path.exists(old_file_storage_dir):
2143       _Fail("Cannot rename '%s' to '%s': both locations exist",
2144             old_file_storage_dir, new_file_storage_dir)
2145
2146
2147 def _EnsureJobQueueFile(file_name):
2148   """Checks whether the given filename is in the queue directory.
2149
2150   @type file_name: str
2151   @param file_name: the file name we should check
2152   @rtype: None
2153   @raises RPCFail: if the file is not valid
2154
2155   """
2156   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2157   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2158
2159   if not result:
2160     _Fail("Passed job queue file '%s' does not belong to"
2161           " the queue directory '%s'", file_name, queue_dir)
2162
2163
2164 def JobQueueUpdate(file_name, content):
2165   """Updates a file in the queue directory.
2166
2167   This is just a wrapper over L{utils.WriteFile}, with proper
2168   checking.
2169
2170   @type file_name: str
2171   @param file_name: the job file name
2172   @type content: str
2173   @param content: the new job contents
2174   @rtype: boolean
2175   @return: the success of the operation
2176
2177   """
2178   _EnsureJobQueueFile(file_name)
2179
2180   # Write and replace the file atomically
2181   utils.WriteFile(file_name, data=_Decompress(content))
2182
2183
2184 def JobQueueRename(old, new):
2185   """Renames a job queue file.
2186
2187   This is just a wrapper over os.rename with proper checking.
2188
2189   @type old: str
2190   @param old: the old (actual) file name
2191   @type new: str
2192   @param new: the desired file name
2193   @rtype: tuple
2194   @return: the success of the operation and payload
2195
2196   """
2197   _EnsureJobQueueFile(old)
2198   _EnsureJobQueueFile(new)
2199
2200   utils.RenameFile(old, new, mkdir=True)
2201
2202
2203 def JobQueueSetDrainFlag(drain_flag):
2204   """Set the drain flag for the queue.
2205
2206   This will set or unset the queue drain flag.
2207
2208   @type drain_flag: boolean
2209   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2210   @rtype: truple
2211   @return: always True, None
2212   @warning: the function always returns True
2213
2214   """
2215   if drain_flag:
2216     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2217   else:
2218     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2219
2220
2221 def BlockdevClose(instance_name, disks):
2222   """Closes the given block devices.
2223
2224   This means they will be switched to secondary mode (in case of
2225   DRBD).
2226
2227   @param instance_name: if the argument is not empty, the symlinks
2228       of this instance will be removed
2229   @type disks: list of L{objects.Disk}
2230   @param disks: the list of disks to be closed
2231   @rtype: tuple (success, message)
2232   @return: a tuple of success and message, where success
2233       indicates the succes of the operation, and message
2234       which will contain the error details in case we
2235       failed
2236
2237   """
2238   bdevs = []
2239   for cf in disks:
2240     rd = _RecursiveFindBD(cf)
2241     if rd is None:
2242       _Fail("Can't find device %s", cf)
2243     bdevs.append(rd)
2244
2245   msg = []
2246   for rd in bdevs:
2247     try:
2248       rd.Close()
2249     except errors.BlockDeviceError, err:
2250       msg.append(str(err))
2251   if msg:
2252     _Fail("Can't make devices secondary: %s", ",".join(msg))
2253   else:
2254     if instance_name:
2255       _RemoveBlockDevLinks(instance_name, disks)
2256
2257
2258 def ValidateHVParams(hvname, hvparams):
2259   """Validates the given hypervisor parameters.
2260
2261   @type hvname: string
2262   @param hvname: the hypervisor name
2263   @type hvparams: dict
2264   @param hvparams: the hypervisor parameters to be validated
2265   @rtype: None
2266
2267   """
2268   try:
2269     hv_type = hypervisor.GetHypervisor(hvname)
2270     hv_type.ValidateParameters(hvparams)
2271   except errors.HypervisorError, err:
2272     _Fail(str(err), log=False)
2273
2274
2275 def DemoteFromMC():
2276   """Demotes the current node from master candidate role.
2277
2278   """
2279   # try to ensure we're not the master by mistake
2280   master, myself = ssconf.GetMasterAndMyself()
2281   if master == myself:
2282     _Fail("ssconf status shows I'm the master node, will not demote")
2283   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2284   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2285     _Fail("The master daemon is running, will not demote")
2286   try:
2287     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2288       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2289   except EnvironmentError, err:
2290     if err.errno != errno.ENOENT:
2291       _Fail("Error while backing up cluster file: %s", err, exc=True)
2292   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2293
2294
2295 def _FindDisks(nodes_ip, disks):
2296   """Sets the physical ID on disks and returns the block devices.
2297
2298   """
2299   # set the correct physical ID
2300   my_name = utils.HostInfo().name
2301   for cf in disks:
2302     cf.SetPhysicalID(my_name, nodes_ip)
2303
2304   bdevs = []
2305
2306   for cf in disks:
2307     rd = _RecursiveFindBD(cf)
2308     if rd is None:
2309       _Fail("Can't find device %s", cf)
2310     bdevs.append(rd)
2311   return bdevs
2312
2313
2314 def DrbdDisconnectNet(nodes_ip, disks):
2315   """Disconnects the network on a list of drbd devices.
2316
2317   """
2318   bdevs = _FindDisks(nodes_ip, disks)
2319
2320   # disconnect disks
2321   for rd in bdevs:
2322     try:
2323       rd.DisconnectNet()
2324     except errors.BlockDeviceError, err:
2325       _Fail("Can't change network configuration to standalone mode: %s",
2326             err, exc=True)
2327
2328
2329 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2330   """Attaches the network on a list of drbd devices.
2331
2332   """
2333   bdevs = _FindDisks(nodes_ip, disks)
2334
2335   if multimaster:
2336     for idx, rd in enumerate(bdevs):
2337       try:
2338         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2339       except EnvironmentError, err:
2340         _Fail("Can't create symlink: %s", err)
2341   # reconnect disks, switch to new master configuration and if
2342   # needed primary mode
2343   for rd in bdevs:
2344     try:
2345       rd.AttachNet(multimaster)
2346     except errors.BlockDeviceError, err:
2347       _Fail("Can't change network configuration: %s", err)
2348   # wait until the disks are connected; we need to retry the re-attach
2349   # if the device becomes standalone, as this might happen if the one
2350   # node disconnects and reconnects in a different mode before the
2351   # other node reconnects; in this case, one or both of the nodes will
2352   # decide it has wrong configuration and switch to standalone
2353   RECONNECT_TIMEOUT = 2 * 60
2354   sleep_time = 0.100 # start with 100 miliseconds
2355   timeout_limit = time.time() + RECONNECT_TIMEOUT
2356   while time.time() < timeout_limit:
2357     all_connected = True
2358     for rd in bdevs:
2359       stats = rd.GetProcStatus()
2360       if not (stats.is_connected or stats.is_in_resync):
2361         all_connected = False
2362       if stats.is_standalone:
2363         # peer had different config info and this node became
2364         # standalone, even though this should not happen with the
2365         # new staged way of changing disk configs
2366         try:
2367           rd.AttachNet(multimaster)
2368         except errors.BlockDeviceError, err:
2369           _Fail("Can't change network configuration: %s", err)
2370     if all_connected:
2371       break
2372     time.sleep(sleep_time)
2373     sleep_time = min(5, sleep_time * 1.5)
2374   if not all_connected:
2375     _Fail("Timeout in disk reconnecting")
2376   if multimaster:
2377     # change to primary mode
2378     for rd in bdevs:
2379       try:
2380         rd.Open()
2381       except errors.BlockDeviceError, err:
2382         _Fail("Can't change to primary mode: %s", err)
2383
2384
2385 def DrbdWaitSync(nodes_ip, disks):
2386   """Wait until DRBDs have synchronized.
2387
2388   """
2389   bdevs = _FindDisks(nodes_ip, disks)
2390
2391   min_resync = 100
2392   alldone = True
2393   for rd in bdevs:
2394     stats = rd.GetProcStatus()
2395     if not (stats.is_connected or stats.is_in_resync):
2396       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2397     alldone = alldone and (not stats.is_in_resync)
2398     if stats.sync_percent is not None:
2399       min_resync = min(min_resync, stats.sync_percent)
2400
2401   return (alldone, min_resync)
2402
2403
2404 def PowercycleNode(hypervisor_type):
2405   """Hard-powercycle the node.
2406
2407   Because we need to return first, and schedule the powercycle in the
2408   background, we won't be able to report failures nicely.
2409
2410   """
2411   hyper = hypervisor.GetHypervisor(hypervisor_type)
2412   try:
2413     pid = os.fork()
2414   except OSError:
2415     # if we can't fork, we'll pretend that we're in the child process
2416     pid = 0
2417   if pid > 0:
2418     return "Reboot scheduled in 5 seconds"
2419   time.sleep(5)
2420   hyper.PowercycleNode()
2421
2422
2423 class HooksRunner(object):
2424   """Hook runner.
2425
2426   This class is instantiated on the node side (ganeti-noded) and not
2427   on the master side.
2428
2429   """
2430   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2431
2432   def __init__(self, hooks_base_dir=None):
2433     """Constructor for hooks runner.
2434
2435     @type hooks_base_dir: str or None
2436     @param hooks_base_dir: if not None, this overrides the
2437         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2438
2439     """
2440     if hooks_base_dir is None:
2441       hooks_base_dir = constants.HOOKS_BASE_DIR
2442     self._BASE_DIR = hooks_base_dir
2443
2444   @staticmethod
2445   def ExecHook(script, env):
2446     """Exec one hook script.
2447
2448     @type script: str
2449     @param script: the full path to the script
2450     @type env: dict
2451     @param env: the environment with which to exec the script
2452     @rtype: tuple (success, message)
2453     @return: a tuple of success and message, where success
2454         indicates the succes of the operation, and message
2455         which will contain the error details in case we
2456         failed
2457
2458     """
2459     # exec the process using subprocess and log the output
2460     fdstdin = None
2461     try:
2462       fdstdin = open("/dev/null", "r")
2463       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2464                                stderr=subprocess.STDOUT, close_fds=True,
2465                                shell=False, cwd="/", env=env)
2466       output = ""
2467       try:
2468         output = child.stdout.read(4096)
2469         child.stdout.close()
2470       except EnvironmentError, err:
2471         output += "Hook script error: %s" % str(err)
2472
2473       while True:
2474         try:
2475           result = child.wait()
2476           break
2477         except EnvironmentError, err:
2478           if err.errno == errno.EINTR:
2479             continue
2480           raise
2481     finally:
2482       # try not to leak fds
2483       for fd in (fdstdin, ):
2484         if fd is not None:
2485           try:
2486             fd.close()
2487           except EnvironmentError, err:
2488             # just log the error
2489             #logging.exception("Error while closing fd %s", fd)
2490             pass
2491
2492     return result == 0, utils.SafeEncode(output.strip())
2493
2494   def RunHooks(self, hpath, phase, env):
2495     """Run the scripts in the hooks directory.
2496
2497     @type hpath: str
2498     @param hpath: the path to the hooks directory which
2499         holds the scripts
2500     @type phase: str
2501     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2502         L{constants.HOOKS_PHASE_POST}
2503     @type env: dict
2504     @param env: dictionary with the environment for the hook
2505     @rtype: list
2506     @return: list of 3-element tuples:
2507       - script path
2508       - script result, either L{constants.HKR_SUCCESS} or
2509         L{constants.HKR_FAIL}
2510       - output of the script
2511
2512     @raise errors.ProgrammerError: for invalid input
2513         parameters
2514
2515     """
2516     if phase == constants.HOOKS_PHASE_PRE:
2517       suffix = "pre"
2518     elif phase == constants.HOOKS_PHASE_POST:
2519       suffix = "post"
2520     else:
2521       _Fail("Unknown hooks phase '%s'", phase)
2522
2523     rr = []
2524
2525     subdir = "%s-%s.d" % (hpath, suffix)
2526     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2527     try:
2528       dir_contents = utils.ListVisibleFiles(dir_name)
2529     except OSError:
2530       # FIXME: must log output in case of failures
2531       return rr
2532
2533     # we use the standard python sort order,
2534     # so 00name is the recommended naming scheme
2535     dir_contents.sort()
2536     for relname in dir_contents:
2537       fname = os.path.join(dir_name, relname)
2538       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2539           self.RE_MASK.match(relname) is not None):
2540         rrval = constants.HKR_SKIP
2541         output = ""
2542       else:
2543         result, output = self.ExecHook(fname, env)
2544         if not result:
2545           rrval = constants.HKR_FAIL
2546         else:
2547           rrval = constants.HKR_SUCCESS
2548       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2549
2550     return rr
2551
2552
2553 class IAllocatorRunner(object):
2554   """IAllocator runner.
2555
2556   This class is instantiated on the node side (ganeti-noded) and not on
2557   the master side.
2558
2559   """
2560   def Run(self, name, idata):
2561     """Run an iallocator script.
2562
2563     @type name: str
2564     @param name: the iallocator script name
2565     @type idata: str
2566     @param idata: the allocator input data
2567
2568     @rtype: tuple
2569     @return: two element tuple of:
2570        - status
2571        - either error message or stdout of allocator (for success)
2572
2573     """
2574     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2575                                   os.path.isfile)
2576     if alloc_script is None:
2577       _Fail("iallocator module '%s' not found in the search path", name)
2578
2579     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2580     try:
2581       os.write(fd, idata)
2582       os.close(fd)
2583       result = utils.RunCmd([alloc_script, fin_name])
2584       if result.failed:
2585         _Fail("iallocator module '%s' failed: %s, output '%s'",
2586               name, result.fail_reason, result.output)
2587     finally:
2588       os.unlink(fin_name)
2589
2590     return result.stdout
2591
2592
2593 class DevCacheManager(object):
2594   """Simple class for managing a cache of block device information.
2595
2596   """
2597   _DEV_PREFIX = "/dev/"
2598   _ROOT_DIR = constants.BDEV_CACHE_DIR
2599
2600   @classmethod
2601   def _ConvertPath(cls, dev_path):
2602     """Converts a /dev/name path to the cache file name.
2603
2604     This replaces slashes with underscores and strips the /dev
2605     prefix. It then returns the full path to the cache file.
2606
2607     @type dev_path: str
2608     @param dev_path: the C{/dev/} path name
2609     @rtype: str
2610     @return: the converted path name
2611
2612     """
2613     if dev_path.startswith(cls._DEV_PREFIX):
2614       dev_path = dev_path[len(cls._DEV_PREFIX):]
2615     dev_path = dev_path.replace("/", "_")
2616     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2617     return fpath
2618
2619   @classmethod
2620   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2621     """Updates the cache information for a given device.
2622
2623     @type dev_path: str
2624     @param dev_path: the pathname of the device
2625     @type owner: str
2626     @param owner: the owner (instance name) of the device
2627     @type on_primary: bool
2628     @param on_primary: whether this is the primary
2629         node nor not
2630     @type iv_name: str
2631     @param iv_name: the instance-visible name of the
2632         device, as in objects.Disk.iv_name
2633
2634     @rtype: None
2635
2636     """
2637     if dev_path is None:
2638       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2639       return
2640     fpath = cls._ConvertPath(dev_path)
2641     if on_primary:
2642       state = "primary"
2643     else:
2644       state = "secondary"
2645     if iv_name is None:
2646       iv_name = "not_visible"
2647     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2648     try:
2649       utils.WriteFile(fpath, data=fdata)
2650     except EnvironmentError, err:
2651       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2652
2653   @classmethod
2654   def RemoveCache(cls, dev_path):
2655     """Remove data for a dev_path.
2656
2657     This is just a wrapper over L{utils.RemoveFile} with a converted
2658     path name and logging.
2659
2660     @type dev_path: str
2661     @param dev_path: the pathname of the device
2662
2663     @rtype: None
2664
2665     """
2666     if dev_path is None:
2667       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2668       return
2669     fpath = cls._ConvertPath(dev_path)
2670     try:
2671       utils.RemoveFile(fpath)
2672     except EnvironmentError, err:
2673       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)