Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 098c0958

History | View | Annotate | Download (11.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
import os
25
import sys
26
import resource
27
import traceback
28

    
29
from optparse import OptionParser
30

    
31

    
32
from ganeti import backend
33
from ganeti import logger
34
from ganeti import constants
35
from ganeti import objects
36
from ganeti import errors
37
from ganeti import ssconf
38

    
39
from twisted.spread import pb
40
from twisted.internet import reactor
41
from twisted.cred import checkers, portal
42
from OpenSSL import SSL
43

    
44

    
45
class ServerContextFactory:
46
  def getContext(self):
47
    ctx = SSL.Context(SSL.TLSv1_METHOD)
48
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
49
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
50
    return ctx
51

    
52
class ServerObject(pb.Avatar):
53
  def __init__(self, name):
54
    self.name = name
55

    
56
  def perspectiveMessageReceived(self, broker, message, args, kw):
57
    """This method is called when a network message is received.
58

    
59
    I will call::
60

    
61
      |  self.perspective_%(message)s(*broker.unserialize(args),
62
      |                               **broker.unserialize(kw))
63

    
64
    to handle the method; subclasses of Avatar are expected to
65
    implement methods of this naming convention.
66

    
67
    """
68
    args = broker.unserialize(args, self)
69
    kw = broker.unserialize(kw, self)
70
    method = getattr(self, "perspective_%s" % message)
71
    tb = None
72
    state = None
73
    try:
74
      state = method(*args, **kw)
75
    except:
76
      tb = traceback.format_exc()
77

    
78
    return broker.serialize((tb, state), self, method, args, kw)
79

    
80
  # the new block devices  --------------------------
81

    
82
  def perspective_blockdev_create(self, params):
83
    bdev_s, size, on_primary = params
84
    bdev = objects.ConfigObject.Loads(bdev_s)
85
    if bdev is None:
86
      raise ValueError("can't unserialize data!")
87
    return backend.CreateBlockDevice(bdev, size, on_primary)
88

    
89

    
90
  def perspective_blockdev_remove(self, params):
91
    bdev_s = params[0]
92
    bdev = objects.ConfigObject.Loads(bdev_s)
93
    return backend.RemoveBlockDevice(bdev)
94

    
95

    
96
  def perspective_blockdev_assemble(self, params):
97
    bdev_s, on_primary = params
98
    bdev = objects.ConfigObject.Loads(bdev_s)
99
    if bdev is None:
100
      raise ValueError("can't unserialize data!")
101
    return backend.AssembleBlockDevice(bdev, on_primary)
102

    
103

    
104
  def perspective_blockdev_shutdown(self, params):
105
    bdev_s = params[0]
106
    bdev = objects.ConfigObject.Loads(bdev_s)
107
    if bdev is None:
108
      raise ValueError("can't unserialize data!")
109
    return backend.ShutdownBlockDevice(bdev)
110

    
111

    
112
  def perspective_blockdev_addchild(self, params):
113
    bdev_s, ndev_s = params
114
    bdev = objects.ConfigObject.Loads(bdev_s)
115
    ndev = objects.ConfigObject.Loads(ndev_s)
116
    if bdev is None or ndev is None:
117
      raise ValueError("can't unserialize data!")
118
    return backend.MirrorAddChild(bdev, ndev)
119

    
120

    
121
  def perspective_blockdev_removechild(self, params):
122
    bdev_s, ndev_s = params
123
    bdev = objects.ConfigObject.Loads(bdev_s)
124
    ndev = objects.ConfigObject.Loads(ndev_s)
125
    if bdev is None or ndev is None:
126
      raise ValueError("can't unserialize data!")
127
    return backend.MirrorRemoveChild(bdev, ndev)
128

    
129
  def perspective_blockdev_getmirrorstatus(self, params):
130
    disks = [objects.ConfigObject.Loads(dsk_s)
131
            for dsk_s in params]
132
    return backend.GetMirrorStatus(disks)
133

    
134
  def perspective_blockdev_find(self, params):
135
    disk = objects.ConfigObject.Loads(params[0])
136
    return backend.FindBlockDevice(disk)
137

    
138
  def perspective_blockdev_snapshot(self, params):
139
    cfbd = objects.ConfigObject.Loads(params[0])
140
    return backend.SnapshotBlockDevice(cfbd)
141

    
142
  # export/import  --------------------------
143

    
144
  def perspective_snapshot_export(self, params):
145
    disk = objects.ConfigObject.Loads(params[0])
146
    dest_node = params[1]
147
    instance = objects.ConfigObject.Loads(params[2])
148
    return backend.ExportSnapshot(disk,dest_node,instance)
149

    
150
  def perspective_finalize_export(self, params):
151
    instance = objects.ConfigObject.Loads(params[0])
152
    snap_disks = [objects.ConfigObject.Loads(str_data)
153
                  for str_data in params[1]]
154
    return backend.FinalizeExport(instance, snap_disks)
155

    
156
  def perspective_export_info(self, params):
157
    dir = params[0]
158
    einfo = backend.ExportInfo(dir)
159
    if einfo is None:
160
      return einfo
161
    return einfo.Dumps()
162

    
163
  def perspective_export_list(self, params):
164
    return backend.ListExports()
165

    
166
  def perspective_export_remove(self, params):
167
    export = params[0]
168
    return backend.RemoveExport(export)
169

    
170
  # volume  --------------------------
171

    
172
  def perspective_volume_list(self, params):
173
    vgname = params[0]
174
    return backend.GetVolumeList(vgname)
175

    
176
  def perspective_vg_list(self, params):
177
    return backend.ListVolumeGroups()
178

    
179
  # bridge  --------------------------
180

    
181
  def perspective_bridges_exist(self, params):
182
    bridges_list = params[0]
183
    return backend.BridgesExist(bridges_list)
184

    
185
  # instance  --------------------------
186

    
187
  def perspective_instance_os_add(self, params):
188
    inst_s, os_disk, swap_disk = params
189
    inst = objects.ConfigObject.Loads(inst_s)
190
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
191

    
192
  def perspective_instance_os_import(self, params):
193
    inst_s, os_disk, swap_disk, src_node, src_image = params
194
    inst = objects.ConfigObject.Loads(inst_s)
195
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
196
                                        src_node, src_image)
197

    
198
  def perspective_instance_shutdown(self, params):
199
    instance = objects.ConfigObject.Loads(params[0])
200
    return backend.ShutdownInstance(instance)
201

    
202
  def perspective_instance_start(self, params):
203
    instance = objects.ConfigObject.Loads(params[0])
204
    extra_args = params[1]
205
    return backend.StartInstance(instance, extra_args)
206

    
207
  def perspective_instance_info(self, params):
208
    return backend.GetInstanceInfo(params[0])
209

    
210
  def perspective_all_instances_info(self, params):
211
    return backend.GetAllInstancesInfo()
212

    
213
  def perspective_instance_list(self, params):
214
    return backend.GetInstanceList()
215

    
216
  # node --------------------------
217

    
218
  def perspective_node_info(self, params):
219
    vgname = params[0]
220
    return backend.GetNodeInfo(vgname)
221

    
222
  def perspective_node_add(self, params):
223
    return backend.AddNode(params[0], params[1], params[2],
224
                           params[3], params[4], params[5])
225

    
226
  def perspective_node_verify(self, params):
227
    return backend.VerifyNode(params[0])
228

    
229
  def perspective_node_start_master(self, params):
230
    return backend.StartMaster()
231

    
232
  def perspective_node_stop_master(self, params):
233
    return backend.StopMaster()
234

    
235
  def perspective_node_leave_cluster(self, params):
236
    return backend.LeaveCluster()
237

    
238
  def perspective_node_volumes(self, params):
239
    return backend.NodeVolumes()
240

    
241
  # cluster --------------------------
242

    
243
  def perspective_version(self, params):
244
    return constants.PROTOCOL_VERSION
245

    
246
  def perspective_upload_file(self, params):
247
    return backend.UploadFile(*params)
248

    
249

    
250
  # os -----------------------
251

    
252
  def perspective_os_diagnose(self, params):
253
    os_list = backend.DiagnoseOS()
254
    if not os_list:
255
      # this catches also return values of 'False',
256
      # for which we can't iterate over
257
      return os_list
258
    result = []
259
    for data in os_list:
260
      if isinstance(data, objects.OS):
261
        result.append(data.Dumps())
262
      elif isinstance(data, errors.InvalidOS):
263
        result.append(data.args)
264
      else:
265
        raise errors.ProgrammerError, ("Invalid result from backend.DiagnoseOS"
266
                                       " (class %s, %s)" %
267
                                       (str(data.__class__), data))
268

    
269
    return result
270

    
271
  def perspective_os_get(self, params):
272
    name = params[0]
273
    try:
274
      os = backend.OSFromDisk(name).Dumps()
275
    except errors.InvalidOS, err:
276
      os = err.args
277
    return os
278

    
279
  # hooks -----------------------
280

    
281
  def perspective_hooks_runner(self, params):
282
    hpath, phase, env = params
283
    hr = backend.HooksRunner()
284
    return hr.RunHooks(hpath, phase, env)
285

    
286

    
287
class MyRealm:
288
  __implements__ = portal.IRealm
289
  def requestAvatar(self, avatarId, mind, *interfaces):
290
    if pb.IPerspective not in interfaces:
291
      raise NotImplementedError
292
    return pb.IPerspective, ServerObject(avatarId), lambda:None
293

    
294

    
295
def ParseOptions():
296
  """Parse the command line options.
297

    
298
  Returns:
299
    (options, args) as from OptionParser.parse_args()
300

    
301
  """
302
  parser = OptionParser(description="Ganeti node daemon",
303
                        usage="%prog [-f] [-d]",
304
                        version="%%prog (ganeti) %s" %
305
                        constants.RELEASE_VERSION)
306

    
307
  parser.add_option("-f", "--foreground", dest="fork",
308
                    help="Don't detach from the current terminal",
309
                    default=True, action="store_false")
310
  parser.add_option("-d", "--debug", dest="debug",
311
                    help="Enable some debug messages",
312
                    default=False, action="store_true")
313
  options, args = parser.parse_args()
314
  return options, args
315

    
316

    
317
def main():
318
  options, args = ParseOptions()
319
  for fname in (constants.SSL_CERT_FILE,):
320
    if not os.path.isfile(fname):
321
      print "config %s not there, will not run." % fname
322
      sys.exit(5)
323

    
324
  try:
325
    ss = ssconf.SimpleStore()
326
    port = ss.GetNodeDaemonPort()
327
    pwdata = ss.GetNodeDaemonPassword()
328
  except errors.ConfigurationError, err:
329
    print "Cluster configuration incomplete: '%s'" % str(err)
330
    sys.exit(5)
331

    
332
  # become a daemon
333
  if options.fork:
334
    createDaemon()
335

    
336
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
337
                      program="ganeti-noded")
338

    
339
  p = portal.Portal(MyRealm())
340
  p.registerChecker(
341
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
342
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
343
  reactor.run()
344

    
345

    
346
def createDaemon():
347
  """Detach a process from the controlling terminal and run it in the
348
  background as a daemon.
349

    
350
  """
351
  UMASK = 077
352
  WORKDIR = "/"
353
  # Default maximum for the number of available file descriptors.
354
  if 'SC_OPEN_MAX' in os.sysconf_names:
355
    try:
356
      MAXFD = os.sysconf('SC_OPEN_MAX')
357
      if MAXFD < 0:
358
        MAXFD = 1024
359
    except OSError:
360
      MAXFD = 1024
361
  else:
362
    MAXFD = 1024
363
  # The standard I/O file descriptors are redirected to /dev/null by default.
364
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
365
  REDIRECT_TO = constants.LOG_NODESERVER
366
  try:
367
    pid = os.fork()
368
  except OSError, e:
369
    raise Exception, "%s [%d]" % (e.strerror, e.errno)
370
  if (pid == 0):  # The first child.
371
    os.setsid()
372
    try:
373
      pid = os.fork() # Fork a second child.
374
    except OSError, e:
375
      raise Exception, "%s [%d]" % (e.strerror, e.errno)
376
    if (pid == 0):  # The second child.
377
      os.chdir(WORKDIR)
378
      os.umask(UMASK)
379
    else:
380
      # exit() or _exit()?  See below.
381
      os._exit(0) # Exit parent (the first child) of the second child.
382
  else:
383
    os._exit(0) # Exit parent of the first child.
384
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
385
  if (maxfd == resource.RLIM_INFINITY):
386
    maxfd = MAXFD
387

    
388
  # Iterate through and close all file descriptors.
389
  for fd in range(0, maxfd):
390
    try:
391
      os.close(fd)
392
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
393
      pass
394
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0)
395
  # Duplicate standard input to standard output and standard error.
396
  os.dup2(0, 1)     # standard output (1)
397
  os.dup2(0, 2)     # standard error (2)
398
  return(0)
399

    
400

    
401
if __name__=='__main__':
402
  main()