Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ a0c3fea1

History | View | Annotate | Download (11.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
import os
25
import sys
26
import resource
27
import traceback
28

    
29
from optparse import OptionParser
30

    
31

    
32
from ganeti import backend
33
from ganeti import logger
34
from ganeti import constants
35
from ganeti import objects
36
from ganeti import errors
37
from ganeti import ssconf
38

    
39
from twisted.spread import pb
40
from twisted.internet import reactor
41
from twisted.cred import checkers, portal
42
from OpenSSL import SSL
43

    
44

    
45
class ServerContextFactory:
46
  def getContext(self):
47
    ctx = SSL.Context(SSL.TLSv1_METHOD)
48
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
49
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
50
    return ctx
51

    
52
class ServerObject(pb.Avatar):
53
  def __init__(self, name):
54
    self.name = name
55

    
56
  def perspectiveMessageReceived(self, broker, message, args, kw):
57
    """This method is called when a network message is received.
58

    
59
    I will call::
60

    
61
      |  self.perspective_%(message)s(*broker.unserialize(args),
62
      |                               **broker.unserialize(kw))
63

    
64
    to handle the method; subclasses of Avatar are expected to
65
    implement methods of this naming convention.
66

    
67
    """
68
    args = broker.unserialize(args, self)
69
    kw = broker.unserialize(kw, self)
70
    method = getattr(self, "perspective_%s" % message)
71
    tb = None
72
    state = None
73
    try:
74
      state = method(*args, **kw)
75
    except:
76
      tb = traceback.format_exc()
77

    
78
    return broker.serialize((tb, state), self, method, args, kw)
79

    
80
  # the new block devices  --------------------------
81

    
82
  def perspective_blockdev_create(self, params):
83
    bdev_s, size, on_primary, info = params
84
    bdev = objects.ConfigObject.Loads(bdev_s)
85
    if bdev is None:
86
      raise ValueError("can't unserialize data!")
87
    return backend.CreateBlockDevice(bdev, size, on_primary, info)
88

    
89
  def perspective_blockdev_remove(self, params):
90
    bdev_s = params[0]
91
    bdev = objects.ConfigObject.Loads(bdev_s)
92
    return backend.RemoveBlockDevice(bdev)
93

    
94
  def perspective_blockdev_assemble(self, params):
95
    bdev_s, on_primary = params
96
    bdev = objects.ConfigObject.Loads(bdev_s)
97
    if bdev is None:
98
      raise ValueError("can't unserialize data!")
99
    return backend.AssembleBlockDevice(bdev, on_primary)
100

    
101
  def perspective_blockdev_shutdown(self, params):
102
    bdev_s = params[0]
103
    bdev = objects.ConfigObject.Loads(bdev_s)
104
    if bdev is None:
105
      raise ValueError("can't unserialize data!")
106
    return backend.ShutdownBlockDevice(bdev)
107

    
108
  def perspective_blockdev_addchild(self, params):
109
    bdev_s, ndev_s = params
110
    bdev = objects.ConfigObject.Loads(bdev_s)
111
    ndev = objects.ConfigObject.Loads(ndev_s)
112
    if bdev is None or ndev is None:
113
      raise ValueError("can't unserialize data!")
114
    return backend.MirrorAddChild(bdev, ndev)
115

    
116
  def perspective_blockdev_removechild(self, params):
117
    bdev_s, ndev_s = params
118
    bdev = objects.ConfigObject.Loads(bdev_s)
119
    ndev = objects.ConfigObject.Loads(ndev_s)
120
    if bdev is None or ndev is None:
121
      raise ValueError("can't unserialize data!")
122
    return backend.MirrorRemoveChild(bdev, ndev)
123

    
124
  def perspective_blockdev_getmirrorstatus(self, params):
125
    disks = [objects.ConfigObject.Loads(dsk_s)
126
            for dsk_s in params]
127
    return backend.GetMirrorStatus(disks)
128

    
129
  def perspective_blockdev_find(self, params):
130
    disk = objects.ConfigObject.Loads(params[0])
131
    return backend.FindBlockDevice(disk)
132

    
133
  def perspective_blockdev_snapshot(self, params):
134
    cfbd = objects.ConfigObject.Loads(params[0])
135
    return backend.SnapshotBlockDevice(cfbd)
136

    
137
  # export/import  --------------------------
138

    
139
  def perspective_snapshot_export(self, params):
140
    disk = objects.ConfigObject.Loads(params[0])
141
    dest_node = params[1]
142
    instance = objects.ConfigObject.Loads(params[2])
143
    return backend.ExportSnapshot(disk,dest_node,instance)
144

    
145
  def perspective_finalize_export(self, params):
146
    instance = objects.ConfigObject.Loads(params[0])
147
    snap_disks = [objects.ConfigObject.Loads(str_data)
148
                  for str_data in params[1]]
149
    return backend.FinalizeExport(instance, snap_disks)
150

    
151
  def perspective_export_info(self, params):
152
    dir = params[0]
153
    einfo = backend.ExportInfo(dir)
154
    if einfo is None:
155
      return einfo
156
    return einfo.Dumps()
157

    
158
  def perspective_export_list(self, params):
159
    return backend.ListExports()
160

    
161
  def perspective_export_remove(self, params):
162
    export = params[0]
163
    return backend.RemoveExport(export)
164

    
165
  # volume  --------------------------
166

    
167
  def perspective_volume_list(self, params):
168
    vgname = params[0]
169
    return backend.GetVolumeList(vgname)
170

    
171
  def perspective_vg_list(self, params):
172
    return backend.ListVolumeGroups()
173

    
174
  # bridge  --------------------------
175

    
176
  def perspective_bridges_exist(self, params):
177
    bridges_list = params[0]
178
    return backend.BridgesExist(bridges_list)
179

    
180
  # instance  --------------------------
181

    
182
  def perspective_instance_os_add(self, params):
183
    inst_s, os_disk, swap_disk = params
184
    inst = objects.ConfigObject.Loads(inst_s)
185
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
186

    
187
  def perspective_instance_os_import(self, params):
188
    inst_s, os_disk, swap_disk, src_node, src_image = params
189
    inst = objects.ConfigObject.Loads(inst_s)
190
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
191
                                        src_node, src_image)
192

    
193
  def perspective_instance_shutdown(self, params):
194
    instance = objects.ConfigObject.Loads(params[0])
195
    return backend.ShutdownInstance(instance)
196

    
197
  def perspective_instance_start(self, params):
198
    instance = objects.ConfigObject.Loads(params[0])
199
    extra_args = params[1]
200
    return backend.StartInstance(instance, extra_args)
201

    
202
  def perspective_instance_info(self, params):
203
    return backend.GetInstanceInfo(params[0])
204

    
205
  def perspective_all_instances_info(self, params):
206
    return backend.GetAllInstancesInfo()
207

    
208
  def perspective_instance_list(self, params):
209
    return backend.GetInstanceList()
210

    
211
  # node --------------------------
212

    
213
  def perspective_node_info(self, params):
214
    vgname = params[0]
215
    return backend.GetNodeInfo(vgname)
216

    
217
  def perspective_node_add(self, params):
218
    return backend.AddNode(params[0], params[1], params[2],
219
                           params[3], params[4], params[5])
220

    
221
  def perspective_node_verify(self, params):
222
    return backend.VerifyNode(params[0])
223

    
224
  def perspective_node_start_master(self, params):
225
    return backend.StartMaster()
226

    
227
  def perspective_node_stop_master(self, params):
228
    return backend.StopMaster()
229

    
230
  def perspective_node_leave_cluster(self, params):
231
    return backend.LeaveCluster()
232

    
233
  def perspective_node_volumes(self, params):
234
    return backend.NodeVolumes()
235

    
236
  # cluster --------------------------
237

    
238
  def perspective_version(self, params):
239
    return constants.PROTOCOL_VERSION
240

    
241
  def perspective_upload_file(self, params):
242
    return backend.UploadFile(*params)
243

    
244

    
245
  # os -----------------------
246

    
247
  def perspective_os_diagnose(self, params):
248
    os_list = backend.DiagnoseOS()
249
    if not os_list:
250
      # this catches also return values of 'False',
251
      # for which we can't iterate over
252
      return os_list
253
    result = []
254
    for data in os_list:
255
      if isinstance(data, objects.OS):
256
        result.append(data.Dumps())
257
      elif isinstance(data, errors.InvalidOS):
258
        result.append(data.args)
259
      else:
260
        raise errors.ProgrammerError, ("Invalid result from backend.DiagnoseOS"
261
                                       " (class %s, %s)" %
262
                                       (str(data.__class__), data))
263

    
264
    return result
265

    
266
  def perspective_os_get(self, params):
267
    name = params[0]
268
    try:
269
      os = backend.OSFromDisk(name).Dumps()
270
    except errors.InvalidOS, err:
271
      os = err.args
272
    return os
273

    
274
  # hooks -----------------------
275

    
276
  def perspective_hooks_runner(self, params):
277
    hpath, phase, env = params
278
    hr = backend.HooksRunner()
279
    return hr.RunHooks(hpath, phase, env)
280

    
281

    
282
class MyRealm:
283
  __implements__ = portal.IRealm
284
  def requestAvatar(self, avatarId, mind, *interfaces):
285
    if pb.IPerspective not in interfaces:
286
      raise NotImplementedError
287
    return pb.IPerspective, ServerObject(avatarId), lambda:None
288

    
289

    
290
def ParseOptions():
291
  """Parse the command line options.
292

    
293
  Returns:
294
    (options, args) as from OptionParser.parse_args()
295

    
296
  """
297
  parser = OptionParser(description="Ganeti node daemon",
298
                        usage="%prog [-f] [-d]",
299
                        version="%%prog (ganeti) %s" %
300
                        constants.RELEASE_VERSION)
301

    
302
  parser.add_option("-f", "--foreground", dest="fork",
303
                    help="Don't detach from the current terminal",
304
                    default=True, action="store_false")
305
  parser.add_option("-d", "--debug", dest="debug",
306
                    help="Enable some debug messages",
307
                    default=False, action="store_true")
308
  options, args = parser.parse_args()
309
  return options, args
310

    
311

    
312
def main():
313
  options, args = ParseOptions()
314
  for fname in (constants.SSL_CERT_FILE,):
315
    if not os.path.isfile(fname):
316
      print "config %s not there, will not run." % fname
317
      sys.exit(5)
318

    
319
  try:
320
    ss = ssconf.SimpleStore()
321
    port = ss.GetNodeDaemonPort()
322
    pwdata = ss.GetNodeDaemonPassword()
323
  except errors.ConfigurationError, err:
324
    print "Cluster configuration incomplete: '%s'" % str(err)
325
    sys.exit(5)
326

    
327
  # become a daemon
328
  if options.fork:
329
    createDaemon()
330

    
331
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
332
                      program="ganeti-noded")
333

    
334
  p = portal.Portal(MyRealm())
335
  p.registerChecker(
336
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
337
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
338
  reactor.run()
339

    
340

    
341
def createDaemon():
342
  """Detach a process from the controlling terminal and run it in the
343
  background as a daemon.
344

    
345
  """
346
  UMASK = 077
347
  WORKDIR = "/"
348
  # Default maximum for the number of available file descriptors.
349
  if 'SC_OPEN_MAX' in os.sysconf_names:
350
    try:
351
      MAXFD = os.sysconf('SC_OPEN_MAX')
352
      if MAXFD < 0:
353
        MAXFD = 1024
354
    except OSError:
355
      MAXFD = 1024
356
  else:
357
    MAXFD = 1024
358
  # The standard I/O file descriptors are redirected to /dev/null by default.
359
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
360
  REDIRECT_TO = constants.LOG_NODESERVER
361
  try:
362
    pid = os.fork()
363
  except OSError, e:
364
    raise Exception, "%s [%d]" % (e.strerror, e.errno)
365
  if (pid == 0):  # The first child.
366
    os.setsid()
367
    try:
368
      pid = os.fork() # Fork a second child.
369
    except OSError, e:
370
      raise Exception, "%s [%d]" % (e.strerror, e.errno)
371
    if (pid == 0):  # The second child.
372
      os.chdir(WORKDIR)
373
      os.umask(UMASK)
374
    else:
375
      # exit() or _exit()?  See below.
376
      os._exit(0) # Exit parent (the first child) of the second child.
377
  else:
378
    os._exit(0) # Exit parent of the first child.
379
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
380
  if (maxfd == resource.RLIM_INFINITY):
381
    maxfd = MAXFD
382

    
383
  # Iterate through and close all file descriptors.
384
  for fd in range(0, maxfd):
385
    try:
386
      os.close(fd)
387
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
388
      pass
389
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0)
390
  # Duplicate standard input to standard output and standard error.
391
  os.dup2(0, 1)     # standard output (1)
392
  os.dup2(0, 2)     # standard error (2)
393
  return(0)
394

    
395

    
396
if __name__=='__main__':
397
  main()