Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 319856a9

History | View | Annotate | Download (15.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41

    
42
from twisted.spread import pb
43
from twisted.internet import reactor
44
from twisted.cred import checkers, portal
45
from OpenSSL import SSL
46

    
47

    
48
class ServerContextFactory:
49
  """SSL context factory class that uses a given certificate.
50

    
51
  """
52
  @staticmethod
53
  def getContext():
54
    """Return a customized context.
55

    
56
    The context will be set to use our certificate.
57

    
58
    """
59
    ctx = SSL.Context(SSL.TLSv1_METHOD)
60
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
61
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
62
    return ctx
63

    
64
class ServerObject(pb.Avatar):
65
  """The server implementation.
66

    
67
  This class holds all methods exposed over the RPC interface.
68

    
69
  """
70
  def __init__(self, name):
71
    self.name = name
72

    
73
  def perspectiveMessageReceived(self, broker, message, args, kw):
74
    """Custom message dispatching function.
75

    
76
    This function overrides the pb.Avatar function in order to provide
77
    a simple form of exception passing (as text only).
78

    
79
    """
80
    args = broker.unserialize(args, self)
81
    kw = broker.unserialize(kw, self)
82
    method = getattr(self, "perspective_%s" % message)
83
    tb = None
84
    state = None
85
    try:
86
      state = method(*args, **kw)
87
    except:
88
      tb = traceback.format_exc()
89

    
90
    return broker.serialize((tb, state), self, method, args, kw)
91

    
92
  # the new block devices  --------------------------
93

    
94
  @staticmethod
95
  def perspective_blockdev_create(params):
96
    """Create a block device.
97

    
98
    """
99
    bdev_s, size, on_primary, info = params
100
    bdev = objects.Disk.FromDict(bdev_s)
101
    if bdev is None:
102
      raise ValueError("can't unserialize data!")
103
    return backend.CreateBlockDevice(bdev, size, on_primary, info)
104

    
105
  @staticmethod
106
  def perspective_blockdev_remove(params):
107
    """Remove a block device.
108

    
109
    """
110
    bdev_s = params[0]
111
    bdev = objects.Disk.FromDict(bdev_s)
112
    return backend.RemoveBlockDevice(bdev)
113

    
114
  @staticmethod
115
  def perspective_blockdev_assemble(params):
116
    """Assemble a block device.
117

    
118
    """
119
    bdev_s, on_primary = params
120
    bdev = objects.Disk.FromDict(bdev_s)
121
    if bdev is None:
122
      raise ValueError("can't unserialize data!")
123
    return backend.AssembleBlockDevice(bdev, on_primary)
124

    
125
  @staticmethod
126
  def perspective_blockdev_shutdown(params):
127
    """Shutdown a block device.
128

    
129
    """
130
    bdev_s = params[0]
131
    bdev = objects.Disk.FromDict(bdev_s)
132
    if bdev is None:
133
      raise ValueError("can't unserialize data!")
134
    return backend.ShutdownBlockDevice(bdev)
135

    
136
  @staticmethod
137
  def perspective_blockdev_addchild(params):
138
    """Add a child to a mirror device.
139

    
140
    Note: this is only valid for mirror devices. It's the caller's duty
141
    to send a correct disk, otherwise we raise an error.
142

    
143
    """
144
    bdev_s, ndev_s = params
145
    bdev = objects.Disk.FromDict(bdev_s)
146
    ndev = objects.Disk.FromDict(ndev_s)
147
    if bdev is None or ndev is None:
148
      raise ValueError("can't unserialize data!")
149
    return backend.MirrorAddChild(bdev, ndev)
150

    
151
  @staticmethod
152
  def perspective_blockdev_removechild(params):
153
    """Remove a child from a mirror device.
154

    
155
    This is only valid for mirror devices, of course. It's the callers
156
    duty to send a correct disk, otherwise we raise an error.
157

    
158
    """
159
    bdev_s, ndev_s = params
160
    bdev = objects.Disk.FromDict(bdev_s)
161
    ndev = objects.Disk.FromDict(ndev_s)
162
    if bdev is None or ndev is None:
163
      raise ValueError("can't unserialize data!")
164
    return backend.MirrorRemoveChild(bdev, ndev)
165

    
166
  @staticmethod
167
  def perspective_blockdev_getmirrorstatus(params):
168
    """Return the mirror status for a list of disks.
169

    
170
    """
171
    disks = [objects.Disk.FromDict(dsk_s)
172
            for dsk_s in params]
173
    return backend.GetMirrorStatus(disks)
174

    
175
  @staticmethod
176
  def perspective_blockdev_find(params):
177
    """Expose the FindBlockDevice functionality for a disk.
178

    
179
    This will try to find but not activate a disk.
180

    
181
    """
182
    disk = objects.Disk.FromDict(params[0])
183
    return backend.FindBlockDevice(disk)
184

    
185
  @staticmethod
186
  def perspective_blockdev_snapshot(params):
187
    """Create a snapshot device.
188

    
189
    Note that this is only valid for LVM disks, if we get passed
190
    something else we raise an exception. The snapshot device can be
191
    remove by calling the generic block device remove call.
192

    
193
    """
194
    cfbd = objects.Disk.FromDict(params[0])
195
    return backend.SnapshotBlockDevice(cfbd)
196

    
197
  # export/import  --------------------------
198

    
199
  @staticmethod
200
  def perspective_snapshot_export(params):
201
    """Export a given snapshot.
202

    
203
    """
204
    disk = objects.Disk.FromDict(params[0])
205
    dest_node = params[1]
206
    instance = objects.Instance.FromDict(params[2])
207
    return backend.ExportSnapshot(disk, dest_node, instance)
208

    
209
  @staticmethod
210
  def perspective_finalize_export(params):
211
    """Expose the finalize export functionality.
212

    
213
    """
214
    instance = objects.Instance.FromDict(params[0])
215
    snap_disks = [objects.Disk.FromDict(str_data)
216
                  for str_data in params[1]]
217
    return backend.FinalizeExport(instance, snap_disks)
218

    
219
  @staticmethod
220
  def perspective_export_info(params):
221
    """Query information about an existing export on this node.
222

    
223
    The given path may not contain an export, in which case we return
224
    None.
225

    
226
    """
227
    path = params[0]
228
    einfo = backend.ExportInfo(path)
229
    if einfo is None:
230
      return einfo
231
    return einfo.Dumps()
232

    
233
  @staticmethod
234
  def perspective_export_list(params):
235
    """List the available exports on this node.
236

    
237
    Note that as opposed to export_info, which may query data about an
238
    export in any path, this only queries the standard Ganeti path
239
    (constants.EXPORT_DIR).
240

    
241
    """
242
    return backend.ListExports()
243

    
244
  @staticmethod
245
  def perspective_export_remove(params):
246
    """Remove an export.
247

    
248
    """
249
    export = params[0]
250
    return backend.RemoveExport(export)
251

    
252
  # volume  --------------------------
253

    
254
  @staticmethod
255
  def perspective_volume_list(params):
256
    """Query the list of logical volumes in a given volume group.
257

    
258
    """
259
    vgname = params[0]
260
    return backend.GetVolumeList(vgname)
261

    
262
  @staticmethod
263
  def perspective_vg_list(params):
264
    """Query the list of volume groups.
265

    
266
    """
267
    return backend.ListVolumeGroups()
268

    
269
  # bridge  --------------------------
270

    
271
  @staticmethod
272
  def perspective_bridges_exist(params):
273
    """Check if all bridges given exist on this node.
274

    
275
    """
276
    bridges_list = params[0]
277
    return backend.BridgesExist(bridges_list)
278

    
279
  # instance  --------------------------
280

    
281
  @staticmethod
282
  def perspective_instance_os_add(params):
283
    """Install an OS on a given instance.
284

    
285
    """
286
    inst_s, os_disk, swap_disk = params
287
    inst = objects.Instance.FromDict(inst_s)
288
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
289

    
290
  @staticmethod
291
  def perspective_instance_run_rename(params):
292
    """Runs the OS rename script for an instance.
293

    
294
    """
295
    inst_s, old_name, os_disk, swap_disk = params
296
    inst = objects.Instance.FromDict(inst_s)
297
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
298

    
299
  @staticmethod
300
  def perspective_instance_os_import(params):
301
    """Run the import function of an OS onto a given instance.
302

    
303
    """
304
    inst_s, os_disk, swap_disk, src_node, src_image = params
305
    inst = objects.Instance.FromDict(inst_s)
306
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
307
                                        src_node, src_image)
308

    
309
  @staticmethod
310
  def perspective_instance_shutdown(params):
311
    """Shutdown an instance.
312

    
313
    """
314
    instance = objects.Instance.FromDict(params[0])
315
    return backend.ShutdownInstance(instance)
316

    
317
  @staticmethod
318
  def perspective_instance_start(params):
319
    """Start an instance.
320

    
321
    """
322
    instance = objects.Instance.FromDict(params[0])
323
    extra_args = params[1]
324
    return backend.StartInstance(instance, extra_args)
325

    
326
  @staticmethod
327
  def perspective_instance_info(params):
328
    """Query instance information.
329

    
330
    """
331
    return backend.GetInstanceInfo(params[0])
332

    
333
  @staticmethod
334
  def perspective_all_instances_info(params):
335
    """Query information about all instances.
336

    
337
    """
338
    return backend.GetAllInstancesInfo()
339

    
340
  @staticmethod
341
  def perspective_instance_list(params):
342
    """Query the list of running instances.
343

    
344
    """
345
    return backend.GetInstanceList()
346

    
347
  # node --------------------------
348

    
349
  @staticmethod
350
  def perspective_node_info(params):
351
    """Query node information.
352

    
353
    """
354
    vgname = params[0]
355
    return backend.GetNodeInfo(vgname)
356

    
357
  @staticmethod
358
  def perspective_node_add(params):
359
    """Complete the registration of this node in the cluster.
360

    
361
    """
362
    return backend.AddNode(params[0], params[1], params[2],
363
                           params[3], params[4], params[5])
364

    
365
  @staticmethod
366
  def perspective_node_verify(params):
367
    """Run a verify sequence on this node.
368

    
369
    """
370
    return backend.VerifyNode(params[0])
371

    
372
  @staticmethod
373
  def perspective_node_start_master(params):
374
    """Promote this node to master status.
375

    
376
    """
377
    return backend.StartMaster()
378

    
379
  @staticmethod
380
  def perspective_node_stop_master(params):
381
    """Demote this node from master status.
382

    
383
    """
384
    return backend.StopMaster()
385

    
386
  @staticmethod
387
  def perspective_node_leave_cluster(params):
388
    """Cleanup after leaving a cluster.
389

    
390
    """
391
    return backend.LeaveCluster()
392

    
393
  @staticmethod
394
  def perspective_node_volumes(params):
395
    """Query the list of all logical volume groups.
396

    
397
    """
398
    return backend.NodeVolumes()
399

    
400
  # cluster --------------------------
401

    
402
  @staticmethod
403
  def perspective_version(params):
404
    """Query version information.
405

    
406
    """
407
    return constants.PROTOCOL_VERSION
408

    
409
  @staticmethod
410
  def perspective_upload_file(params):
411
    """Upload a file.
412

    
413
    Note that the backend implementation imposes strict rules on which
414
    files are accepted.
415

    
416
    """
417
    return backend.UploadFile(*params)
418

    
419

    
420
  # os -----------------------
421

    
422
  @staticmethod
423
  def perspective_os_diagnose(params):
424
    """Query detailed information about existing OSes.
425

    
426
    """
427
    os_list = backend.DiagnoseOS()
428
    if not os_list:
429
      # this catches also return values of 'False',
430
      # for which we can't iterate over
431
      return os_list
432
    result = []
433
    for data in os_list:
434
      if isinstance(data, objects.OS):
435
        result.append(data.ToDict())
436
      elif isinstance(data, errors.InvalidOS):
437
        result.append(data.args)
438
      else:
439
        raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
440
                                     " (class %s, %s)" %
441
                                     (str(data.__class__), data))
442

    
443
    return result
444

    
445
  @staticmethod
446
  def perspective_os_get(params):
447
    """Query information about a given OS.
448

    
449
    """
450
    name = params[0]
451
    try:
452
      os_obj = backend.OSFromDisk(name).ToDict()
453
    except errors.InvalidOS, err:
454
      os_obj = err.args
455
    return os_obj
456

    
457
  # hooks -----------------------
458

    
459
  @staticmethod
460
  def perspective_hooks_runner(params):
461
    """Run hook scripts.
462

    
463
    """
464
    hpath, phase, env = params
465
    hr = backend.HooksRunner()
466
    return hr.RunHooks(hpath, phase, env)
467

    
468

    
469
class MyRealm:
470
  """Simple realm that forwards all requests to a ServerObject.
471

    
472
  """
473
  __implements__ = portal.IRealm
474

    
475
  def requestAvatar(self, avatarId, mind, *interfaces):
476
    """Return an avatar based on our ServerObject class.
477

    
478
    """
479
    if pb.IPerspective not in interfaces:
480
      raise NotImplementedError
481
    return pb.IPerspective, ServerObject(avatarId), lambda:None
482

    
483

    
484
def ParseOptions():
485
  """Parse the command line options.
486

    
487
  Returns:
488
    (options, args) as from OptionParser.parse_args()
489

    
490
  """
491
  parser = OptionParser(description="Ganeti node daemon",
492
                        usage="%prog [-f] [-d]",
493
                        version="%%prog (ganeti) %s" %
494
                        constants.RELEASE_VERSION)
495

    
496
  parser.add_option("-f", "--foreground", dest="fork",
497
                    help="Don't detach from the current terminal",
498
                    default=True, action="store_false")
499
  parser.add_option("-d", "--debug", dest="debug",
500
                    help="Enable some debug messages",
501
                    default=False, action="store_true")
502
  options, args = parser.parse_args()
503
  return options, args
504

    
505

    
506
def main():
507
  """Main function for the node daemon.
508

    
509
  """
510
  options, args = ParseOptions()
511
  for fname in (constants.SSL_CERT_FILE,):
512
    if not os.path.isfile(fname):
513
      print "config %s not there, will not run." % fname
514
      sys.exit(5)
515

    
516
  try:
517
    ss = ssconf.SimpleStore()
518
    port = ss.GetNodeDaemonPort()
519
    pwdata = ss.GetNodeDaemonPassword()
520
  except errors.ConfigurationError, err:
521
    print "Cluster configuration incomplete: '%s'" % str(err)
522
    sys.exit(5)
523

    
524
  # become a daemon
525
  if options.fork:
526
    createDaemon()
527

    
528
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
529
                      program="ganeti-noded")
530

    
531
  p = portal.Portal(MyRealm())
532
  p.registerChecker(
533
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
534
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
535
  reactor.run()
536

    
537

    
538
def createDaemon():
539
  """Detach a process from the controlling terminal and run it in the
540
  background as a daemon.
541

    
542
  """
543
  UMASK = 077
544
  WORKDIR = "/"
545
  # Default maximum for the number of available file descriptors.
546
  if 'SC_OPEN_MAX' in os.sysconf_names:
547
    try:
548
      MAXFD = os.sysconf('SC_OPEN_MAX')
549
      if MAXFD < 0:
550
        MAXFD = 1024
551
    except OSError:
552
      MAXFD = 1024
553
  else:
554
    MAXFD = 1024
555
  # The standard I/O file descriptors are redirected to /dev/null by default.
556
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
557
  REDIRECT_TO = constants.LOG_NODESERVER
558
  try:
559
    pid = os.fork()
560
  except OSError, e:
561
    raise Exception("%s [%d]" % (e.strerror, e.errno))
562
  if (pid == 0):  # The first child.
563
    os.setsid()
564
    try:
565
      pid = os.fork() # Fork a second child.
566
    except OSError, e:
567
      raise Exception("%s [%d]" % (e.strerror, e.errno))
568
    if (pid == 0):  # The second child.
569
      os.chdir(WORKDIR)
570
      os.umask(UMASK)
571
    else:
572
      # exit() or _exit()?  See below.
573
      os._exit(0) # Exit parent (the first child) of the second child.
574
  else:
575
    os._exit(0) # Exit parent of the first child.
576
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
577
  if (maxfd == resource.RLIM_INFINITY):
578
    maxfd = MAXFD
579

    
580
  # Iterate through and close all file descriptors.
581
  for fd in range(0, maxfd):
582
    try:
583
      os.close(fd)
584
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
585
      pass
586
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0)
587
  # Duplicate standard input to standard output and standard error.
588
  os.dup2(0, 1)     # standard output (1)
589
  os.dup2(0, 2)     # standard error (2)
590
  return(0)
591

    
592

    
593
if __name__ == '__main__':
594
  main()