Initial commit.
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 import os
25 import sys
26 import resource
27 import traceback
28
29 from optparse import OptionParser
30
31
32 from ganeti import backend
33 from ganeti import logger
34 from ganeti import constants
35 from ganeti import objects
36 from ganeti import errors
37 from ganeti import ssconf
38
39 from twisted.spread import pb
40 from twisted.internet import reactor
41 from twisted.cred import checkers, portal
42 from OpenSSL import SSL
43
44
45 class ServerContextFactory:
46   def getContext(self):
47     ctx = SSL.Context(SSL.TLSv1_METHOD)
48     ctx.use_certificate_file(constants.SSL_CERT_FILE)
49     ctx.use_privatekey_file(constants.SSL_CERT_FILE)
50     return ctx
51
52 class ServerObject(pb.Avatar):
53   def __init__(self, name):
54     self.name = name
55
56   def perspectiveMessageReceived(self, broker, message, args, kw):
57     """This method is called when a network message is received.
58
59     I will call::
60
61       |  self.perspective_%(message)s(*broker.unserialize(args),
62       |                               **broker.unserialize(kw))
63
64     to handle the method; subclasses of Avatar are expected to
65     implement methods of this naming convention.
66     """
67
68     args = broker.unserialize(args, self)
69     kw = broker.unserialize(kw, self)
70     method = getattr(self, "perspective_%s" % message)
71     tb = None
72     state = None
73     try:
74       state = method(*args, **kw)
75     except:
76       tb = traceback.format_exc()
77
78     return broker.serialize((tb, state), self, method, args, kw)
79
80   # the new block devices  --------------------------
81
82   def perspective_blockdev_create(self,params):
83     bdev_s, size, on_primary = params
84     bdev = objects.ConfigObject.Loads(bdev_s)
85     if bdev is None:
86       raise ValueError("can't unserialize data!")
87     return backend.CreateBlockDevice(bdev, size, on_primary)
88
89
90   def perspective_blockdev_remove(self,params):
91     bdev_s = params[0]
92     bdev = objects.ConfigObject.Loads(bdev_s)
93     return backend.RemoveBlockDevice(bdev)
94
95
96   def perspective_blockdev_assemble(self,params):
97     bdev_s, on_primary = params
98     bdev = objects.ConfigObject.Loads(bdev_s)
99     if bdev is None:
100       raise ValueError("can't unserialize data!")
101     return backend.AssembleBlockDevice(bdev, on_primary)
102
103
104   def perspective_blockdev_shutdown(self,params):
105     bdev_s = params[0]
106     bdev = objects.ConfigObject.Loads(bdev_s)
107     if bdev is None:
108       raise ValueError("can't unserialize data!")
109     return backend.ShutdownBlockDevice(bdev)
110
111
112   def perspective_blockdev_addchild(self,params):
113     bdev_s, ndev_s = params
114     bdev = objects.ConfigObject.Loads(bdev_s)
115     ndev = objects.ConfigObject.Loads(ndev_s)
116     if bdev is None or ndev is None:
117       raise ValueError("can't unserialize data!")
118     return backend.MirrorAddChild(bdev, ndev)
119
120
121   def perspective_blockdev_removechild(self,params):
122     bdev_s, ndev_s = params
123     bdev = objects.ConfigObject.Loads(bdev_s)
124     ndev = objects.ConfigObject.Loads(ndev_s)
125     if bdev is None or ndev is None:
126       raise ValueError("can't unserialize data!")
127     return backend.MirrorRemoveChild(bdev, ndev)
128
129   def perspective_blockdev_getmirrorstatus(self, params):
130     disks = [objects.ConfigObject.Loads(dsk_s)
131             for dsk_s in params]
132     return backend.GetMirrorStatus(disks)
133
134   def perspective_blockdev_find(self, params):
135     disk = objects.ConfigObject.Loads(params[0])
136     return backend.FindBlockDevice(disk)
137
138   def perspective_blockdev_snapshot(self,params):
139     cfbd = objects.ConfigObject.Loads(params[0])
140     return backend.SnapshotBlockDevice(cfbd)
141
142   # export/import  --------------------------
143
144   def perspective_snapshot_export(self,params):
145     disk = objects.ConfigObject.Loads(params[0])
146     dest_node = params[1]
147     instance = objects.ConfigObject.Loads(params[2])
148     return backend.ExportSnapshot(disk,dest_node,instance)
149
150   def perspective_finalize_export(self,params):
151     instance = objects.ConfigObject.Loads(params[0])
152     snap_disks = [objects.ConfigObject.Loads(str_data)
153                   for str_data in params[1]]
154     return backend.FinalizeExport(instance, snap_disks)
155
156   def perspective_export_info(self,params):
157     dir = params[0]
158     einfo = backend.ExportInfo(dir)
159     if einfo is None:
160       return einfo
161     return einfo.Dumps()
162
163   def perspective_export_list(self, params):
164     return backend.ListExports()
165
166   def perspective_export_remove(self, params):
167     export = params[0]
168     return backend.RemoveExport(export)
169
170   # volume  --------------------------
171
172   def perspective_volume_list(self,params):
173     vgname = params[0]
174     return backend.GetVolumeList(vgname)
175
176   def perspective_vg_list(self,params):
177     return backend.ListVolumeGroups()
178
179   # bridge  --------------------------
180
181   def perspective_bridges_exist(self,params):
182     bridges_list = params[0]
183     return backend.BridgesExist(bridges_list)
184
185   # instance  --------------------------
186
187   def perspective_instance_os_add(self,params):
188     inst_s, os_disk, swap_disk = params
189     inst = objects.ConfigObject.Loads(inst_s)
190     return backend.AddOSToInstance(inst, os_disk, swap_disk)
191
192   def perspective_instance_os_import(self, params):
193     inst_s, os_disk, swap_disk, src_node, src_image = params
194     inst = objects.ConfigObject.Loads(inst_s)
195     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
196                                         src_node, src_image)
197
198   def perspective_instance_shutdown(self,params):
199     instance = objects.ConfigObject.Loads(params[0])
200     return backend.ShutdownInstance(instance)
201
202   def perspective_instance_start(self,params):
203     instance = objects.ConfigObject.Loads(params[0])
204     extra_args = params[1]
205     return backend.StartInstance(instance, extra_args)
206
207   def perspective_instance_info(self,params):
208     return backend.GetInstanceInfo(params[0])
209
210   def perspective_all_instances_info(self,params):
211     return backend.GetAllInstancesInfo()
212
213   def perspective_instance_list(self,params):
214     return backend.GetInstanceList()
215
216   # node --------------------------
217
218   def perspective_node_info(self,params):
219     vgname = params[0]
220     return backend.GetNodeInfo(vgname)
221
222   def perspective_node_add(self,params):
223     return backend.AddNode(params[0], params[1], params[2],
224                            params[3], params[4], params[5])
225
226   def perspective_node_verify(self,params):
227     return backend.VerifyNode(params[0])
228
229   def perspective_node_start_master(self, params):
230     return backend.StartMaster()
231
232   def perspective_node_stop_master(self, params):
233     return backend.StopMaster()
234
235   def perspective_node_leave_cluster(self, params):
236     return backend.LeaveCluster()
237
238   # cluster --------------------------
239
240   def perspective_version(self,params):
241     return constants.PROTOCOL_VERSION
242
243   def perspective_configfile_list(self,params):
244     return backend.ListConfigFiles()
245
246   def perspective_upload_file(self,params):
247     return backend.UploadFile(*params)
248
249
250   # os -----------------------
251
252   def perspective_os_diagnose(self, params):
253     os_list = backend.DiagnoseOS()
254     if not os_list:
255       # this catches also return values of 'False',
256       # for which we can't iterate over
257       return os_list
258     result = []
259     for data in os_list:
260       if isinstance(data, objects.OS):
261         result.append(data.Dumps())
262       elif isinstance(data, errors.InvalidOS):
263         result.append(data.args)
264       else:
265         raise errors.ProgrammerError, ("Invalid result from backend.DiagnoseOS"
266                                        " (class %s, %s)" %
267                                        (str(data.__class__), data))
268
269     return result
270
271   def perspective_os_get(self, params):
272     name = params[0]
273     try:
274       os = backend.OSFromDisk(name).Dumps()
275     except errors.InvalidOS, err:
276       os = err.args
277     return os
278
279   # hooks -----------------------
280
281   def perspective_hooks_runner(self, params):
282     hpath, phase, env = params
283     hr = backend.HooksRunner()
284     return hr.RunHooks(hpath, phase, env)
285
286
287 class MyRealm:
288   __implements__ = portal.IRealm
289   def requestAvatar(self, avatarId, mind, *interfaces):
290     if pb.IPerspective not in interfaces:
291       raise NotImplementedError
292     return pb.IPerspective, ServerObject(avatarId), lambda:None
293
294
295 def ParseOptions():
296   """Parse the command line options.
297
298   Returns:
299     (options, args) as from OptionParser.parse_args()
300
301   """
302   parser = OptionParser(description="Ganeti node daemon",
303                         usage="%prog [-f] [-d]",
304                         version="%%prog (ganeti) %s" %
305                         constants.RELEASE_VERSION)
306
307   parser.add_option("-f", "--foreground", dest="fork",
308                     help="Don't detach from the current terminal",
309                     default=True, action="store_false")
310   parser.add_option("-d", "--debug", dest="debug",
311                     help="Enable some debug messages",
312                     default=False, action="store_true")
313   options, args = parser.parse_args()
314   return options, args
315
316
317 def main():
318   options, args = ParseOptions()
319   for fname in (constants.SSL_CERT_FILE,):
320     if not os.path.isfile(fname):
321       print "config %s not there, will not run." % fname
322       sys.exit(5)
323
324   try:
325     ss = ssconf.SimpleStore()
326     port = ss.GetNodeDaemonPort()
327     pwdata = ss.GetNodeDaemonPassword()
328   except errors.ConfigurationError, err:
329     print "Cluster configuration incomplete: '%s'" % str(err)
330     sys.exit(5)
331
332   # become a daemon
333   if options.fork:
334     createDaemon()
335
336   logger.SetupLogging(twisted_workaround=True, debug=options.debug,
337                       program="ganeti-noded")
338
339   p = portal.Portal(MyRealm())
340   p.registerChecker(
341     checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
342   reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
343   reactor.run()
344
345
346 def createDaemon():
347   """Detach a process from the controlling terminal and run it in the
348   background as a daemon.
349   """
350   UMASK = 077
351   WORKDIR = "/"
352   # Default maximum for the number of available file descriptors.
353   if 'SC_OPEN_MAX' in os.sysconf_names:
354     try:
355       MAXFD = os.sysconf('SC_OPEN_MAX')
356       if MAXFD < 0:
357         MAXFD = 1024
358     except OSError:
359       MAXFD = 1024
360   else:
361     MAXFD = 1024
362   # The standard I/O file descriptors are redirected to /dev/null by default.
363   #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
364   REDIRECT_TO = constants.LOG_NODESERVER
365   try:
366     pid = os.fork()
367   except OSError, e:
368     raise Exception, "%s [%d]" % (e.strerror, e.errno)
369   if (pid == 0):        # The first child.
370     os.setsid()
371     try:
372       pid = os.fork()   # Fork a second child.
373     except OSError, e:
374       raise Exception, "%s [%d]" % (e.strerror, e.errno)
375     if (pid == 0):      # The second child.
376       os.chdir(WORKDIR)
377       os.umask(UMASK)
378     else:
379       # exit() or _exit()?  See below.
380       os._exit(0)       # Exit parent (the first child) of the second child.
381   else:
382     os._exit(0) # Exit parent of the first child.
383   maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
384   if (maxfd == resource.RLIM_INFINITY):
385     maxfd = MAXFD
386
387   # Iterate through and close all file descriptors.
388   for fd in range(0, maxfd):
389     try:
390       os.close(fd)
391     except OSError:     # ERROR, fd wasn't open to begin with (ignored)
392       pass
393   os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0)
394   # Duplicate standard input to standard output and standard error.
395   os.dup2(0, 1)                 # standard output (1)
396   os.dup2(0, 2)                 # standard error (2)
397   return(0)
398
399
400 if __name__=='__main__':
401   main()