Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ cb91d46e

History | View | Annotate | Download (11.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
import os
25
import sys
26
import resource
27
import traceback
28

    
29
from optparse import OptionParser
30

    
31

    
32
from ganeti import backend
33
from ganeti import logger
34
from ganeti import constants
35
from ganeti import objects
36
from ganeti import errors
37
from ganeti import ssconf
38

    
39
from twisted.spread import pb
40
from twisted.internet import reactor
41
from twisted.cred import checkers, portal
42
from OpenSSL import SSL
43

    
44

    
45
class ServerContextFactory:
46
  def getContext(self):
47
    ctx = SSL.Context(SSL.TLSv1_METHOD)
48
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
49
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
50
    return ctx
51

    
52
class ServerObject(pb.Avatar):
53
  def __init__(self, name):
54
    self.name = name
55

    
56
  def perspectiveMessageReceived(self, broker, message, args, kw):
57
    """This method is called when a network message is received.
58

    
59
    I will call::
60

    
61
      |  self.perspective_%(message)s(*broker.unserialize(args),
62
      |                               **broker.unserialize(kw))
63

    
64
    to handle the method; subclasses of Avatar are expected to
65
    implement methods of this naming convention.
66
    """
67

    
68
    args = broker.unserialize(args, self)
69
    kw = broker.unserialize(kw, self)
70
    method = getattr(self, "perspective_%s" % message)
71
    tb = None
72
    state = None
73
    try:
74
      state = method(*args, **kw)
75
    except:
76
      tb = traceback.format_exc()
77

    
78
    return broker.serialize((tb, state), self, method, args, kw)
79

    
80
  # the new block devices  --------------------------
81

    
82
  def perspective_blockdev_create(self, params):
83
    bdev_s, size, on_primary = params
84
    bdev = objects.ConfigObject.Loads(bdev_s)
85
    if bdev is None:
86
      raise ValueError("can't unserialize data!")
87
    return backend.CreateBlockDevice(bdev, size, on_primary)
88

    
89

    
90
  def perspective_blockdev_remove(self, params):
91
    bdev_s = params[0]
92
    bdev = objects.ConfigObject.Loads(bdev_s)
93
    return backend.RemoveBlockDevice(bdev)
94

    
95

    
96
  def perspective_blockdev_assemble(self, params):
97
    bdev_s, on_primary = params
98
    bdev = objects.ConfigObject.Loads(bdev_s)
99
    if bdev is None:
100
      raise ValueError("can't unserialize data!")
101
    return backend.AssembleBlockDevice(bdev, on_primary)
102

    
103

    
104
  def perspective_blockdev_shutdown(self, params):
105
    bdev_s = params[0]
106
    bdev = objects.ConfigObject.Loads(bdev_s)
107
    if bdev is None:
108
      raise ValueError("can't unserialize data!")
109
    return backend.ShutdownBlockDevice(bdev)
110

    
111

    
112
  def perspective_blockdev_addchild(self, params):
113
    bdev_s, ndev_s = params
114
    bdev = objects.ConfigObject.Loads(bdev_s)
115
    ndev = objects.ConfigObject.Loads(ndev_s)
116
    if bdev is None or ndev is None:
117
      raise ValueError("can't unserialize data!")
118
    return backend.MirrorAddChild(bdev, ndev)
119

    
120

    
121
  def perspective_blockdev_removechild(self, params):
122
    bdev_s, ndev_s = params
123
    bdev = objects.ConfigObject.Loads(bdev_s)
124
    ndev = objects.ConfigObject.Loads(ndev_s)
125
    if bdev is None or ndev is None:
126
      raise ValueError("can't unserialize data!")
127
    return backend.MirrorRemoveChild(bdev, ndev)
128

    
129
  def perspective_blockdev_getmirrorstatus(self, params):
130
    disks = [objects.ConfigObject.Loads(dsk_s)
131
            for dsk_s in params]
132
    return backend.GetMirrorStatus(disks)
133

    
134
  def perspective_blockdev_find(self, params):
135
    disk = objects.ConfigObject.Loads(params[0])
136
    return backend.FindBlockDevice(disk)
137

    
138
  def perspective_blockdev_snapshot(self, params):
139
    cfbd = objects.ConfigObject.Loads(params[0])
140
    return backend.SnapshotBlockDevice(cfbd)
141

    
142
  # export/import  --------------------------
143

    
144
  def perspective_snapshot_export(self, params):
145
    disk = objects.ConfigObject.Loads(params[0])
146
    dest_node = params[1]
147
    instance = objects.ConfigObject.Loads(params[2])
148
    return backend.ExportSnapshot(disk,dest_node,instance)
149

    
150
  def perspective_finalize_export(self, params):
151
    instance = objects.ConfigObject.Loads(params[0])
152
    snap_disks = [objects.ConfigObject.Loads(str_data)
153
                  for str_data in params[1]]
154
    return backend.FinalizeExport(instance, snap_disks)
155

    
156
  def perspective_export_info(self, params):
157
    dir = params[0]
158
    einfo = backend.ExportInfo(dir)
159
    if einfo is None:
160
      return einfo
161
    return einfo.Dumps()
162

    
163
  def perspective_export_list(self, params):
164
    return backend.ListExports()
165

    
166
  def perspective_export_remove(self, params):
167
    export = params[0]
168
    return backend.RemoveExport(export)
169

    
170
  # volume  --------------------------
171

    
172
  def perspective_volume_list(self, params):
173
    vgname = params[0]
174
    return backend.GetVolumeList(vgname)
175

    
176
  def perspective_vg_list(self, params):
177
    return backend.ListVolumeGroups()
178

    
179
  # bridge  --------------------------
180

    
181
  def perspective_bridges_exist(self, params):
182
    bridges_list = params[0]
183
    return backend.BridgesExist(bridges_list)
184

    
185
  # instance  --------------------------
186

    
187
  def perspective_instance_os_add(self, params):
188
    inst_s, os_disk, swap_disk = params
189
    inst = objects.ConfigObject.Loads(inst_s)
190
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
191

    
192
  def perspective_instance_os_import(self, params):
193
    inst_s, os_disk, swap_disk, src_node, src_image = params
194
    inst = objects.ConfigObject.Loads(inst_s)
195
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
196
                                        src_node, src_image)
197

    
198
  def perspective_instance_shutdown(self, params):
199
    instance = objects.ConfigObject.Loads(params[0])
200
    return backend.ShutdownInstance(instance)
201

    
202
  def perspective_instance_start(self, params):
203
    instance = objects.ConfigObject.Loads(params[0])
204
    extra_args = params[1]
205
    return backend.StartInstance(instance, extra_args)
206

    
207
  def perspective_instance_info(self, params):
208
    return backend.GetInstanceInfo(params[0])
209

    
210
  def perspective_all_instances_info(self, params):
211
    return backend.GetAllInstancesInfo()
212

    
213
  def perspective_instance_list(self, params):
214
    return backend.GetInstanceList()
215

    
216
  # node --------------------------
217

    
218
  def perspective_node_info(self, params):
219
    vgname = params[0]
220
    return backend.GetNodeInfo(vgname)
221

    
222
  def perspective_node_add(self, params):
223
    return backend.AddNode(params[0], params[1], params[2],
224
                           params[3], params[4], params[5])
225

    
226
  def perspective_node_verify(self, params):
227
    return backend.VerifyNode(params[0])
228

    
229
  def perspective_node_start_master(self, params):
230
    return backend.StartMaster()
231

    
232
  def perspective_node_stop_master(self, params):
233
    return backend.StopMaster()
234

    
235
  def perspective_node_leave_cluster(self, params):
236
    return backend.LeaveCluster()
237

    
238
  def perspective_node_volumes(self, params):
239
    return backend.NodeVolumes()
240

    
241
  # cluster --------------------------
242

    
243
  def perspective_version(self, params):
244
    return constants.PROTOCOL_VERSION
245

    
246
  def perspective_upload_file(self, params):
247
    return backend.UploadFile(*params)
248

    
249

    
250
  # os -----------------------
251

    
252
  def perspective_os_diagnose(self, params):
253
    os_list = backend.DiagnoseOS()
254
    if not os_list:
255
      # this catches also return values of 'False',
256
      # for which we can't iterate over
257
      return os_list
258
    result = []
259
    for data in os_list:
260
      if isinstance(data, objects.OS):
261
        result.append(data.Dumps())
262
      elif isinstance(data, errors.InvalidOS):
263
        result.append(data.args)
264
      else:
265
        raise errors.ProgrammerError, ("Invalid result from backend.DiagnoseOS"
266
                                       " (class %s, %s)" %
267
                                       (str(data.__class__), data))
268

    
269
    return result
270

    
271
  def perspective_os_get(self, params):
272
    name = params[0]
273
    try:
274
      os = backend.OSFromDisk(name).Dumps()
275
    except errors.InvalidOS, err:
276
      os = err.args
277
    return os
278

    
279
  # hooks -----------------------
280

    
281
  def perspective_hooks_runner(self, params):
282
    hpath, phase, env = params
283
    hr = backend.HooksRunner()
284
    return hr.RunHooks(hpath, phase, env)
285

    
286

    
287
class MyRealm:
288
  __implements__ = portal.IRealm
289
  def requestAvatar(self, avatarId, mind, *interfaces):
290
    if pb.IPerspective not in interfaces:
291
      raise NotImplementedError
292
    return pb.IPerspective, ServerObject(avatarId), lambda:None
293

    
294

    
295
def ParseOptions():
296
  """Parse the command line options.
297

    
298
  Returns:
299
    (options, args) as from OptionParser.parse_args()
300

    
301
  """
302
  parser = OptionParser(description="Ganeti node daemon",
303
                        usage="%prog [-f] [-d]",
304
                        version="%%prog (ganeti) %s" %
305
                        constants.RELEASE_VERSION)
306

    
307
  parser.add_option("-f", "--foreground", dest="fork",
308
                    help="Don't detach from the current terminal",
309
                    default=True, action="store_false")
310
  parser.add_option("-d", "--debug", dest="debug",
311
                    help="Enable some debug messages",
312
                    default=False, action="store_true")
313
  options, args = parser.parse_args()
314
  return options, args
315

    
316

    
317
def main():
318
  options, args = ParseOptions()
319
  for fname in (constants.SSL_CERT_FILE,):
320
    if not os.path.isfile(fname):
321
      print "config %s not there, will not run." % fname
322
      sys.exit(5)
323

    
324
  try:
325
    ss = ssconf.SimpleStore()
326
    port = ss.GetNodeDaemonPort()
327
    pwdata = ss.GetNodeDaemonPassword()
328
  except errors.ConfigurationError, err:
329
    print "Cluster configuration incomplete: '%s'" % str(err)
330
    sys.exit(5)
331

    
332
  # become a daemon
333
  if options.fork:
334
    createDaemon()
335

    
336
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
337
                      program="ganeti-noded")
338

    
339
  p = portal.Portal(MyRealm())
340
  p.registerChecker(
341
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
342
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
343
  reactor.run()
344

    
345

    
346
def createDaemon():
347
  """Detach a process from the controlling terminal and run it in the
348
  background as a daemon.
349
  """
350
  UMASK = 077
351
  WORKDIR = "/"
352
  # Default maximum for the number of available file descriptors.
353
  if 'SC_OPEN_MAX' in os.sysconf_names:
354
    try:
355
      MAXFD = os.sysconf('SC_OPEN_MAX')
356
      if MAXFD < 0:
357
        MAXFD = 1024
358
    except OSError:
359
      MAXFD = 1024
360
  else:
361
    MAXFD = 1024
362
  # The standard I/O file descriptors are redirected to /dev/null by default.
363
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
364
  REDIRECT_TO = constants.LOG_NODESERVER
365
  try:
366
    pid = os.fork()
367
  except OSError, e:
368
    raise Exception, "%s [%d]" % (e.strerror, e.errno)
369
  if (pid == 0):  # The first child.
370
    os.setsid()
371
    try:
372
      pid = os.fork() # Fork a second child.
373
    except OSError, e:
374
      raise Exception, "%s [%d]" % (e.strerror, e.errno)
375
    if (pid == 0):  # The second child.
376
      os.chdir(WORKDIR)
377
      os.umask(UMASK)
378
    else:
379
      # exit() or _exit()?  See below.
380
      os._exit(0) # Exit parent (the first child) of the second child.
381
  else:
382
    os._exit(0) # Exit parent of the first child.
383
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
384
  if (maxfd == resource.RLIM_INFINITY):
385
    maxfd = MAXFD
386

    
387
  # Iterate through and close all file descriptors.
388
  for fd in range(0, maxfd):
389
    try:
390
      os.close(fd)
391
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
392
      pass
393
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0)
394
  # Duplicate standard input to standard output and standard error.
395
  os.dup2(0, 1)     # standard output (1)
396
  os.dup2(0, 2)     # standard error (2)
397
  return(0)
398

    
399

    
400
if __name__=='__main__':
401
  main()