Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 3ecf6786

History | View | Annotate | Download (15.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41

    
42
from twisted.spread import pb
43
from twisted.internet import reactor
44
from twisted.cred import checkers, portal
45
from OpenSSL import SSL
46

    
47

    
48
class ServerContextFactory:
49
  """SSL context factory class that uses a given certificate.
50

    
51
  """
52
  @staticmethod
53
  def getContext():
54
    """Return a customized context.
55

    
56
    The context will be set to use our certificate.
57

    
58
    """
59
    ctx = SSL.Context(SSL.TLSv1_METHOD)
60
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
61
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
62
    return ctx
63

    
64
class ServerObject(pb.Avatar):
65
  """The server implementation.
66

    
67
  This class holds all methods exposed over the RPC interface.
68

    
69
  """
70
  def __init__(self, name):
71
    self.name = name
72

    
73
  def perspectiveMessageReceived(self, broker, message, args, kw):
74
    """Custom message dispatching function.
75

    
76
    This function overrides the pb.Avatar function in order to provide
77
    a simple form of exception passing (as text only).
78

    
79
    """
80
    args = broker.unserialize(args, self)
81
    kw = broker.unserialize(kw, self)
82
    method = getattr(self, "perspective_%s" % message)
83
    tb = None
84
    state = None
85
    try:
86
      state = method(*args, **kw)
87
    except:
88
      tb = traceback.format_exc()
89

    
90
    return broker.serialize((tb, state), self, method, args, kw)
91

    
92
  # the new block devices  --------------------------
93

    
94
  @staticmethod
95
  def perspective_blockdev_create(params):
96
    """Create a block device.
97

    
98
    """
99
    bdev_s, size, on_primary, info = params
100
    bdev = objects.ConfigObject.Loads(bdev_s)
101
    if bdev is None:
102
      raise ValueError("can't unserialize data!")
103
    return backend.CreateBlockDevice(bdev, size, on_primary, info)
104

    
105
  @staticmethod
106
  def perspective_blockdev_remove(params):
107
    """Remove a block device.
108

    
109
    """
110
    bdev_s = params[0]
111
    bdev = objects.ConfigObject.Loads(bdev_s)
112
    return backend.RemoveBlockDevice(bdev)
113

    
114
  @staticmethod
115
  def perspective_blockdev_assemble(params):
116
    """Assemble a block device.
117

    
118
    """
119
    bdev_s, on_primary = params
120
    bdev = objects.ConfigObject.Loads(bdev_s)
121
    if bdev is None:
122
      raise ValueError("can't unserialize data!")
123
    return backend.AssembleBlockDevice(bdev, on_primary)
124

    
125
  @staticmethod
126
  def perspective_blockdev_shutdown(params):
127
    """Shutdown a block device.
128

    
129
    """
130
    bdev_s = params[0]
131
    bdev = objects.ConfigObject.Loads(bdev_s)
132
    if bdev is None:
133
      raise ValueError("can't unserialize data!")
134
    return backend.ShutdownBlockDevice(bdev)
135

    
136
  @staticmethod
137
  def perspective_blockdev_addchild(params):
138
    """Add a child to a mirror device.
139

    
140
    Note: this is only valid for mirror devices. It's the caller's duty
141
    to send a correct disk, otherwise we raise an error.
142

    
143
    """
144
    bdev_s, ndev_s = params
145
    bdev = objects.ConfigObject.Loads(bdev_s)
146
    ndev = objects.ConfigObject.Loads(ndev_s)
147
    if bdev is None or ndev is None:
148
      raise ValueError("can't unserialize data!")
149
    return backend.MirrorAddChild(bdev, ndev)
150

    
151
  @staticmethod
152
  def perspective_blockdev_removechild(params):
153
    """Remove a child from a mirror device.
154

    
155
    This is only valid for mirror devices, of course. It's the callers
156
    duty to send a correct disk, otherwise we raise an error.
157

    
158
    """
159
    bdev_s, ndev_s = params
160
    bdev = objects.ConfigObject.Loads(bdev_s)
161
    ndev = objects.ConfigObject.Loads(ndev_s)
162
    if bdev is None or ndev is None:
163
      raise ValueError("can't unserialize data!")
164
    return backend.MirrorRemoveChild(bdev, ndev)
165

    
166
  @staticmethod
167
  def perspective_blockdev_getmirrorstatus(params):
168
    """Return the mirror status for a list of disks.
169

    
170
    """
171
    disks = [objects.ConfigObject.Loads(dsk_s)
172
            for dsk_s in params]
173
    return backend.GetMirrorStatus(disks)
174

    
175
  @staticmethod
176
  def perspective_blockdev_find(params):
177
    """Expose the FindBlockDevice functionality for a disk.
178

    
179
    This will try to find but not activate a disk.
180

    
181
    """
182
    disk = objects.ConfigObject.Loads(params[0])
183
    return backend.FindBlockDevice(disk)
184

    
185
  @staticmethod
186
  def perspective_blockdev_snapshot(params):
187
    """Create a snapshot device.
188

    
189
    Note that this is only valid for LVM disks, if we get passed
190
    something else we raise an exception. The snapshot device can be
191
    remove by calling the generic block device remove call.
192

    
193
    """
194
    cfbd = objects.ConfigObject.Loads(params[0])
195
    return backend.SnapshotBlockDevice(cfbd)
196

    
197
  # export/import  --------------------------
198

    
199
  @staticmethod
200
  def perspective_snapshot_export(params):
201
    """Export a given snapshot.
202

    
203
    """
204
    disk = objects.ConfigObject.Loads(params[0])
205
    dest_node = params[1]
206
    instance = objects.ConfigObject.Loads(params[2])
207
    return backend.ExportSnapshot(disk, dest_node, instance)
208

    
209
  @staticmethod
210
  def perspective_finalize_export(params):
211
    """Expose the finalize export functionality.
212

    
213
    """
214
    instance = objects.ConfigObject.Loads(params[0])
215
    snap_disks = [objects.ConfigObject.Loads(str_data)
216
                  for str_data in params[1]]
217
    return backend.FinalizeExport(instance, snap_disks)
218

    
219
  @staticmethod
220
  def perspective_export_info(params):
221
    """Query information about an existing export on this node.
222

    
223
    The given path may not contain an export, in which case we return
224
    None.
225

    
226
    """
227
    path = params[0]
228
    einfo = backend.ExportInfo(path)
229
    if einfo is None:
230
      return einfo
231
    return einfo.Dumps()
232

    
233
  @staticmethod
234
  def perspective_export_list(params):
235
    """List the available exports on this node.
236

    
237
    Note that as opposed to export_info, which may query data about an
238
    export in any path, this only queries the standard Ganeti path
239
    (constants.EXPORT_DIR).
240

    
241
    """
242
    return backend.ListExports()
243

    
244
  @staticmethod
245
  def perspective_export_remove(params):
246
    """Remove an export.
247

    
248
    """
249
    export = params[0]
250
    return backend.RemoveExport(export)
251

    
252
  # volume  --------------------------
253

    
254
  @staticmethod
255
  def perspective_volume_list(params):
256
    """Query the list of logical volumes in a given volume group.
257

    
258
    """
259
    vgname = params[0]
260
    return backend.GetVolumeList(vgname)
261

    
262
  @staticmethod
263
  def perspective_vg_list(params):
264
    """Query the list of volume groups.
265

    
266
    """
267
    return backend.ListVolumeGroups()
268

    
269
  # bridge  --------------------------
270

    
271
  @staticmethod
272
  def perspective_bridges_exist(params):
273
    """Check if all bridges given exist on this node.
274

    
275
    """
276
    bridges_list = params[0]
277
    return backend.BridgesExist(bridges_list)
278

    
279
  # instance  --------------------------
280

    
281
  @staticmethod
282
  def perspective_instance_os_add(params):
283
    """Install an OS on a given instance.
284

    
285
    """
286
    inst_s, os_disk, swap_disk = params
287
    inst = objects.ConfigObject.Loads(inst_s)
288
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
289

    
290
  @staticmethod
291
  def perspective_instance_os_import(params):
292
    """Run the import function of an OS onto a given instance.
293

    
294
    """
295
    inst_s, os_disk, swap_disk, src_node, src_image = params
296
    inst = objects.ConfigObject.Loads(inst_s)
297
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
298
                                        src_node, src_image)
299

    
300
  @staticmethod
301
  def perspective_instance_shutdown(params):
302
    """Shutdown an instance.
303

    
304
    """
305
    instance = objects.ConfigObject.Loads(params[0])
306
    return backend.ShutdownInstance(instance)
307

    
308
  @staticmethod
309
  def perspective_instance_start(params):
310
    """Start an instance.
311

    
312
    """
313
    instance = objects.ConfigObject.Loads(params[0])
314
    extra_args = params[1]
315
    return backend.StartInstance(instance, extra_args)
316

    
317
  @staticmethod
318
  def perspective_instance_info(params):
319
    """Query instance information.
320

    
321
    """
322
    return backend.GetInstanceInfo(params[0])
323

    
324
  @staticmethod
325
  def perspective_all_instances_info(params):
326
    """Query information about all instances.
327

    
328
    """
329
    return backend.GetAllInstancesInfo()
330

    
331
  @staticmethod
332
  def perspective_instance_list(params):
333
    """Query the list of running instances.
334

    
335
    """
336
    return backend.GetInstanceList()
337

    
338
  # node --------------------------
339

    
340
  @staticmethod
341
  def perspective_node_info(params):
342
    """Query node information.
343

    
344
    """
345
    vgname = params[0]
346
    return backend.GetNodeInfo(vgname)
347

    
348
  @staticmethod
349
  def perspective_node_add(params):
350
    """Complete the registration of this node in the cluster.
351

    
352
    """
353
    return backend.AddNode(params[0], params[1], params[2],
354
                           params[3], params[4], params[5])
355

    
356
  @staticmethod
357
  def perspective_node_verify(params):
358
    """Run a verify sequence on this node.
359

    
360
    """
361
    return backend.VerifyNode(params[0])
362

    
363
  @staticmethod
364
  def perspective_node_start_master(params):
365
    """Promote this node to master status.
366

    
367
    """
368
    return backend.StartMaster()
369

    
370
  @staticmethod
371
  def perspective_node_stop_master(params):
372
    """Demote this node from master status.
373

    
374
    """
375
    return backend.StopMaster()
376

    
377
  @staticmethod
378
  def perspective_node_leave_cluster(params):
379
    """Cleanup after leaving a cluster.
380

    
381
    """
382
    return backend.LeaveCluster()
383

    
384
  @staticmethod
385
  def perspective_node_volumes(params):
386
    """Query the list of all logical volume groups.
387

    
388
    """
389
    return backend.NodeVolumes()
390

    
391
  # cluster --------------------------
392

    
393
  @staticmethod
394
  def perspective_version(params):
395
    """Query version information.
396

    
397
    """
398
    return constants.PROTOCOL_VERSION
399

    
400
  @staticmethod
401
  def perspective_upload_file(params):
402
    """Upload a file.
403

    
404
    Note that the backend implementation imposes strict rules on which
405
    files are accepted.
406

    
407
    """
408
    return backend.UploadFile(*params)
409

    
410

    
411
  # os -----------------------
412

    
413
  @staticmethod
414
  def perspective_os_diagnose(params):
415
    """Query detailed information about existing OSes.
416

    
417
    """
418
    os_list = backend.DiagnoseOS()
419
    if not os_list:
420
      # this catches also return values of 'False',
421
      # for which we can't iterate over
422
      return os_list
423
    result = []
424
    for data in os_list:
425
      if isinstance(data, objects.OS):
426
        result.append(data.Dumps())
427
      elif isinstance(data, errors.InvalidOS):
428
        result.append(data.args)
429
      else:
430
        raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
431
                                     " (class %s, %s)" %
432
                                     (str(data.__class__), data))
433

    
434
    return result
435

    
436
  @staticmethod
437
  def perspective_os_get(params):
438
    """Query information about a given OS.
439

    
440
    """
441
    name = params[0]
442
    try:
443
      os_obj = backend.OSFromDisk(name).Dumps()
444
    except errors.InvalidOS, err:
445
      os_obj = err.args
446
    return os_obj
447

    
448
  # hooks -----------------------
449

    
450
  @staticmethod
451
  def perspective_hooks_runner(params):
452
    """Run hook scripts.
453

    
454
    """
455
    hpath, phase, env = params
456
    hr = backend.HooksRunner()
457
    return hr.RunHooks(hpath, phase, env)
458

    
459

    
460
class MyRealm:
461
  """Simple realm that forwards all requests to a ServerObject.
462

    
463
  """
464
  __implements__ = portal.IRealm
465

    
466
  def requestAvatar(self, avatarId, mind, *interfaces):
467
    """Return an avatar based on our ServerObject class.
468

    
469
    """
470
    if pb.IPerspective not in interfaces:
471
      raise NotImplementedError
472
    return pb.IPerspective, ServerObject(avatarId), lambda:None
473

    
474

    
475
def ParseOptions():
476
  """Parse the command line options.
477

    
478
  Returns:
479
    (options, args) as from OptionParser.parse_args()
480

    
481
  """
482
  parser = OptionParser(description="Ganeti node daemon",
483
                        usage="%prog [-f] [-d]",
484
                        version="%%prog (ganeti) %s" %
485
                        constants.RELEASE_VERSION)
486

    
487
  parser.add_option("-f", "--foreground", dest="fork",
488
                    help="Don't detach from the current terminal",
489
                    default=True, action="store_false")
490
  parser.add_option("-d", "--debug", dest="debug",
491
                    help="Enable some debug messages",
492
                    default=False, action="store_true")
493
  options, args = parser.parse_args()
494
  return options, args
495

    
496

    
497
def main():
498
  """Main function for the node daemon.
499

    
500
  """
501
  options, args = ParseOptions()
502
  for fname in (constants.SSL_CERT_FILE,):
503
    if not os.path.isfile(fname):
504
      print "config %s not there, will not run." % fname
505
      sys.exit(5)
506

    
507
  try:
508
    ss = ssconf.SimpleStore()
509
    port = ss.GetNodeDaemonPort()
510
    pwdata = ss.GetNodeDaemonPassword()
511
  except errors.ConfigurationError, err:
512
    print "Cluster configuration incomplete: '%s'" % str(err)
513
    sys.exit(5)
514

    
515
  # become a daemon
516
  if options.fork:
517
    createDaemon()
518

    
519
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
520
                      program="ganeti-noded")
521

    
522
  p = portal.Portal(MyRealm())
523
  p.registerChecker(
524
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
525
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
526
  reactor.run()
527

    
528

    
529
def createDaemon():
530
  """Detach a process from the controlling terminal and run it in the
531
  background as a daemon.
532

    
533
  """
534
  UMASK = 077
535
  WORKDIR = "/"
536
  # Default maximum for the number of available file descriptors.
537
  if 'SC_OPEN_MAX' in os.sysconf_names:
538
    try:
539
      MAXFD = os.sysconf('SC_OPEN_MAX')
540
      if MAXFD < 0:
541
        MAXFD = 1024
542
    except OSError:
543
      MAXFD = 1024
544
  else:
545
    MAXFD = 1024
546
  # The standard I/O file descriptors are redirected to /dev/null by default.
547
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
548
  REDIRECT_TO = constants.LOG_NODESERVER
549
  try:
550
    pid = os.fork()
551
  except OSError, e:
552
    raise Exception("%s [%d]" % (e.strerror, e.errno))
553
  if (pid == 0):  # The first child.
554
    os.setsid()
555
    try:
556
      pid = os.fork() # Fork a second child.
557
    except OSError, e:
558
      raise Exception("%s [%d]" % (e.strerror, e.errno))
559
    if (pid == 0):  # The second child.
560
      os.chdir(WORKDIR)
561
      os.umask(UMASK)
562
    else:
563
      # exit() or _exit()?  See below.
564
      os._exit(0) # Exit parent (the first child) of the second child.
565
  else:
566
    os._exit(0) # Exit parent of the first child.
567
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
568
  if (maxfd == resource.RLIM_INFINITY):
569
    maxfd = MAXFD
570

    
571
  # Iterate through and close all file descriptors.
572
  for fd in range(0, maxfd):
573
    try:
574
      os.close(fd)
575
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
576
      pass
577
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0)
578
  # Duplicate standard input to standard output and standard error.
579
  os.dup2(0, 1)     # standard output (1)
580
  os.dup2(0, 2)     # standard error (2)
581
  return(0)
582

    
583

    
584
if __name__ == '__main__':
585
  main()