Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ dfa96ded

History | View | Annotate | Download (15.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41
from ganeti import utils
42

    
43
from twisted.spread import pb
44
from twisted.internet import reactor
45
from twisted.cred import checkers, portal
46
from OpenSSL import SSL
47

    
48

    
49
class ServerContextFactory:
50
  """SSL context factory class that uses a given certificate.
51

    
52
  """
53
  @staticmethod
54
  def getContext():
55
    """Return a customized context.
56

    
57
    The context will be set to use our certificate.
58

    
59
    """
60
    ctx = SSL.Context(SSL.TLSv1_METHOD)
61
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
62
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63
    return ctx
64

    
65
class ServerObject(pb.Avatar):
66
  """The server implementation.
67

    
68
  This class holds all methods exposed over the RPC interface.
69

    
70
  """
71
  def __init__(self, name):
72
    self.name = name
73

    
74
  def perspectiveMessageReceived(self, broker, message, args, kw):
75
    """Custom message dispatching function.
76

    
77
    This function overrides the pb.Avatar function in order to provide
78
    a simple form of exception passing (as text only).
79

    
80
    """
81
    args = broker.unserialize(args, self)
82
    kw = broker.unserialize(kw, self)
83
    method = getattr(self, "perspective_%s" % message)
84
    tb = None
85
    state = None
86
    try:
87
      state = method(*args, **kw)
88
    except:
89
      tb = traceback.format_exc()
90

    
91
    return broker.serialize((tb, state), self, method, args, kw)
92

    
93
  # the new block devices  --------------------------
94

    
95
  @staticmethod
96
  def perspective_blockdev_create(params):
97
    """Create a block device.
98

    
99
    """
100
    bdev_s, size, owner, on_primary, info = params
101
    bdev = objects.Disk.FromDict(bdev_s)
102
    if bdev is None:
103
      raise ValueError("can't unserialize data!")
104
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
105

    
106
  @staticmethod
107
  def perspective_blockdev_remove(params):
108
    """Remove a block device.
109

    
110
    """
111
    bdev_s = params[0]
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    return backend.RemoveBlockDevice(bdev)
114

    
115
  @staticmethod
116
  def perspective_blockdev_rename(params):
117
    """Remove a block device.
118

    
119
    """
120
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121
    return backend.RenameBlockDevices(devlist)
122

    
123
  @staticmethod
124
  def perspective_blockdev_assemble(params):
125
    """Assemble a block device.
126

    
127
    """
128
    bdev_s, owner, on_primary = params
129
    bdev = objects.Disk.FromDict(bdev_s)
130
    if bdev is None:
131
      raise ValueError("can't unserialize data!")
132
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
133

    
134
  @staticmethod
135
  def perspective_blockdev_shutdown(params):
136
    """Shutdown a block device.
137

    
138
    """
139
    bdev_s = params[0]
140
    bdev = objects.Disk.FromDict(bdev_s)
141
    if bdev is None:
142
      raise ValueError("can't unserialize data!")
143
    return backend.ShutdownBlockDevice(bdev)
144

    
145
  @staticmethod
146
  def perspective_blockdev_addchildren(params):
147
    """Add a child to a mirror device.
148

    
149
    Note: this is only valid for mirror devices. It's the caller's duty
150
    to send a correct disk, otherwise we raise an error.
151

    
152
    """
153
    bdev_s, ndev_s = params
154
    bdev = objects.Disk.FromDict(bdev_s)
155
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156
    if bdev is None or ndevs.count(None) > 0:
157
      raise ValueError("can't unserialize data!")
158
    return backend.MirrorAddChildren(bdev, ndevs)
159

    
160
  @staticmethod
161
  def perspective_blockdev_removechildren(params):
162
    """Remove a child from a mirror device.
163

    
164
    This is only valid for mirror devices, of course. It's the callers
165
    duty to send a correct disk, otherwise we raise an error.
166

    
167
    """
168
    bdev_s, ndev_s = params
169
    bdev = objects.Disk.FromDict(bdev_s)
170
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171
    if bdev is None or ndevs.count(None) > 0:
172
      raise ValueError("can't unserialize data!")
173
    return backend.MirrorRemoveChildren(bdev, ndevs)
174

    
175
  @staticmethod
176
  def perspective_blockdev_getmirrorstatus(params):
177
    """Return the mirror status for a list of disks.
178

    
179
    """
180
    disks = [objects.Disk.FromDict(dsk_s)
181
            for dsk_s in params]
182
    return backend.GetMirrorStatus(disks)
183

    
184
  @staticmethod
185
  def perspective_blockdev_find(params):
186
    """Expose the FindBlockDevice functionality for a disk.
187

    
188
    This will try to find but not activate a disk.
189

    
190
    """
191
    disk = objects.Disk.FromDict(params[0])
192
    return backend.FindBlockDevice(disk)
193

    
194
  @staticmethod
195
  def perspective_blockdev_snapshot(params):
196
    """Create a snapshot device.
197

    
198
    Note that this is only valid for LVM disks, if we get passed
199
    something else we raise an exception. The snapshot device can be
200
    remove by calling the generic block device remove call.
201

    
202
    """
203
    cfbd = objects.Disk.FromDict(params[0])
204
    return backend.SnapshotBlockDevice(cfbd)
205

    
206
  # export/import  --------------------------
207

    
208
  @staticmethod
209
  def perspective_snapshot_export(params):
210
    """Export a given snapshot.
211

    
212
    """
213
    disk = objects.Disk.FromDict(params[0])
214
    dest_node = params[1]
215
    instance = objects.Instance.FromDict(params[2])
216
    return backend.ExportSnapshot(disk, dest_node, instance)
217

    
218
  @staticmethod
219
  def perspective_finalize_export(params):
220
    """Expose the finalize export functionality.
221

    
222
    """
223
    instance = objects.Instance.FromDict(params[0])
224
    snap_disks = [objects.Disk.FromDict(str_data)
225
                  for str_data in params[1]]
226
    return backend.FinalizeExport(instance, snap_disks)
227

    
228
  @staticmethod
229
  def perspective_export_info(params):
230
    """Query information about an existing export on this node.
231

    
232
    The given path may not contain an export, in which case we return
233
    None.
234

    
235
    """
236
    path = params[0]
237
    einfo = backend.ExportInfo(path)
238
    if einfo is None:
239
      return einfo
240
    return einfo.Dumps()
241

    
242
  @staticmethod
243
  def perspective_export_list(params):
244
    """List the available exports on this node.
245

    
246
    Note that as opposed to export_info, which may query data about an
247
    export in any path, this only queries the standard Ganeti path
248
    (constants.EXPORT_DIR).
249

    
250
    """
251
    return backend.ListExports()
252

    
253
  @staticmethod
254
  def perspective_export_remove(params):
255
    """Remove an export.
256

    
257
    """
258
    export = params[0]
259
    return backend.RemoveExport(export)
260

    
261
  # volume  --------------------------
262

    
263
  @staticmethod
264
  def perspective_volume_list(params):
265
    """Query the list of logical volumes in a given volume group.
266

    
267
    """
268
    vgname = params[0]
269
    return backend.GetVolumeList(vgname)
270

    
271
  @staticmethod
272
  def perspective_vg_list(params):
273
    """Query the list of volume groups.
274

    
275
    """
276
    return backend.ListVolumeGroups()
277

    
278
  # bridge  --------------------------
279

    
280
  @staticmethod
281
  def perspective_bridges_exist(params):
282
    """Check if all bridges given exist on this node.
283

    
284
    """
285
    bridges_list = params[0]
286
    return backend.BridgesExist(bridges_list)
287

    
288
  # instance  --------------------------
289

    
290
  @staticmethod
291
  def perspective_instance_os_add(params):
292
    """Install an OS on a given instance.
293

    
294
    """
295
    inst_s, os_disk, swap_disk = params
296
    inst = objects.Instance.FromDict(inst_s)
297
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
298

    
299
  @staticmethod
300
  def perspective_instance_run_rename(params):
301
    """Runs the OS rename script for an instance.
302

    
303
    """
304
    inst_s, old_name, os_disk, swap_disk = params
305
    inst = objects.Instance.FromDict(inst_s)
306
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307

    
308
  @staticmethod
309
  def perspective_instance_os_import(params):
310
    """Run the import function of an OS onto a given instance.
311

    
312
    """
313
    inst_s, os_disk, swap_disk, src_node, src_image = params
314
    inst = objects.Instance.FromDict(inst_s)
315
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
316
                                        src_node, src_image)
317

    
318
  @staticmethod
319
  def perspective_instance_shutdown(params):
320
    """Shutdown an instance.
321

    
322
    """
323
    instance = objects.Instance.FromDict(params[0])
324
    return backend.ShutdownInstance(instance)
325

    
326
  @staticmethod
327
  def perspective_instance_start(params):
328
    """Start an instance.
329

    
330
    """
331
    instance = objects.Instance.FromDict(params[0])
332
    extra_args = params[1]
333
    return backend.StartInstance(instance, extra_args)
334

    
335
  @staticmethod
336
  def perspective_instance_reboot(params):
337
    """Reboot an instance.
338

    
339
    """
340
    instance = objects.Instance.FromDict(params[0])
341
    reboot_type = params[1]
342
    extra_args = params[2]
343
    return backend.RebootInstance(instance, reboot_type, extra_args)
344

    
345
  @staticmethod
346
  def perspective_instance_info(params):
347
    """Query instance information.
348

    
349
    """
350
    return backend.GetInstanceInfo(params[0])
351

    
352
  @staticmethod
353
  def perspective_all_instances_info(params):
354
    """Query information about all instances.
355

    
356
    """
357
    return backend.GetAllInstancesInfo()
358

    
359
  @staticmethod
360
  def perspective_instance_list(params):
361
    """Query the list of running instances.
362

    
363
    """
364
    return backend.GetInstanceList()
365

    
366
  # node --------------------------
367

    
368
  @staticmethod
369
  def perspective_node_tcp_ping(params):
370
    """Do a TcpPing on the remote node.
371

    
372
    """
373
    return utils.TcpPing(params[0], params[1], params[2],
374
                         timeout=params[3], live_port_needed=params[4])
375

    
376
  @staticmethod
377
  def perspective_node_info(params):
378
    """Query node information.
379

    
380
    """
381
    vgname = params[0]
382
    return backend.GetNodeInfo(vgname)
383

    
384
  @staticmethod
385
  def perspective_node_add(params):
386
    """Complete the registration of this node in the cluster.
387

    
388
    """
389
    return backend.AddNode(params[0], params[1], params[2],
390
                           params[3], params[4], params[5])
391

    
392
  @staticmethod
393
  def perspective_node_verify(params):
394
    """Run a verify sequence on this node.
395

    
396
    """
397
    return backend.VerifyNode(params[0])
398

    
399
  @staticmethod
400
  def perspective_node_start_master(params):
401
    """Promote this node to master status.
402

    
403
    """
404
    return backend.StartMaster()
405

    
406
  @staticmethod
407
  def perspective_node_stop_master(params):
408
    """Demote this node from master status.
409

    
410
    """
411
    return backend.StopMaster()
412

    
413
  @staticmethod
414
  def perspective_node_leave_cluster(params):
415
    """Cleanup after leaving a cluster.
416

    
417
    """
418
    return backend.LeaveCluster()
419

    
420
  @staticmethod
421
  def perspective_node_volumes(params):
422
    """Query the list of all logical volume groups.
423

    
424
    """
425
    return backend.NodeVolumes()
426

    
427
  # cluster --------------------------
428

    
429
  @staticmethod
430
  def perspective_version(params):
431
    """Query version information.
432

    
433
    """
434
    return constants.PROTOCOL_VERSION
435

    
436
  @staticmethod
437
  def perspective_upload_file(params):
438
    """Upload a file.
439

    
440
    Note that the backend implementation imposes strict rules on which
441
    files are accepted.
442

    
443
    """
444
    return backend.UploadFile(*params)
445

    
446

    
447
  # os -----------------------
448

    
449
  @staticmethod
450
  def perspective_os_diagnose(params):
451
    """Query detailed information about existing OSes.
452

    
453
    """
454
    return [os.ToDict() for os in backend.DiagnoseOS()]
455

    
456
  @staticmethod
457
  def perspective_os_get(params):
458
    """Query information about a given OS.
459

    
460
    """
461
    name = params[0]
462
    try:
463
      os_obj = backend.OSFromDisk(name)
464
    except errors.InvalidOS, err:
465
      os_obj = objects.OS.FromInvalidOS(err)
466
    return os_obj.ToDict()
467

    
468
  # hooks -----------------------
469

    
470
  @staticmethod
471
  def perspective_hooks_runner(params):
472
    """Run hook scripts.
473

    
474
    """
475
    hpath, phase, env = params
476
    hr = backend.HooksRunner()
477
    return hr.RunHooks(hpath, phase, env)
478

    
479

    
480
class MyRealm:
481
  """Simple realm that forwards all requests to a ServerObject.
482

    
483
  """
484
  __implements__ = portal.IRealm
485

    
486
  def requestAvatar(self, avatarId, mind, *interfaces):
487
    """Return an avatar based on our ServerObject class.
488

    
489
    """
490
    if pb.IPerspective not in interfaces:
491
      raise NotImplementedError
492
    return pb.IPerspective, ServerObject(avatarId), lambda:None
493

    
494

    
495
def ParseOptions():
496
  """Parse the command line options.
497

    
498
  Returns:
499
    (options, args) as from OptionParser.parse_args()
500

    
501
  """
502
  parser = OptionParser(description="Ganeti node daemon",
503
                        usage="%prog [-f] [-d]",
504
                        version="%%prog (ganeti) %s" %
505
                        constants.RELEASE_VERSION)
506

    
507
  parser.add_option("-f", "--foreground", dest="fork",
508
                    help="Don't detach from the current terminal",
509
                    default=True, action="store_false")
510
  parser.add_option("-d", "--debug", dest="debug",
511
                    help="Enable some debug messages",
512
                    default=False, action="store_true")
513
  options, args = parser.parse_args()
514
  return options, args
515

    
516

    
517
def main():
518
  """Main function for the node daemon.
519

    
520
  """
521
  options, args = ParseOptions()
522
  for fname in (constants.SSL_CERT_FILE,):
523
    if not os.path.isfile(fname):
524
      print "config %s not there, will not run." % fname
525
      sys.exit(5)
526

    
527
  try:
528
    ss = ssconf.SimpleStore()
529
    port = ss.GetNodeDaemonPort()
530
    pwdata = ss.GetNodeDaemonPassword()
531
  except errors.ConfigurationError, err:
532
    print "Cluster configuration incomplete: '%s'" % str(err)
533
    sys.exit(5)
534

    
535
  # become a daemon
536
  if options.fork:
537
    createDaemon()
538

    
539
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
540
                      program="ganeti-noded")
541

    
542
  p = portal.Portal(MyRealm())
543
  p.registerChecker(
544
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
545
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
546
  reactor.run()
547

    
548

    
549
def createDaemon():
550
  """Detach a process from the controlling terminal and run it in the
551
  background as a daemon.
552

    
553
  """
554
  UMASK = 077
555
  WORKDIR = "/"
556
  # Default maximum for the number of available file descriptors.
557
  if 'SC_OPEN_MAX' in os.sysconf_names:
558
    try:
559
      MAXFD = os.sysconf('SC_OPEN_MAX')
560
      if MAXFD < 0:
561
        MAXFD = 1024
562
    except OSError:
563
      MAXFD = 1024
564
  else:
565
    MAXFD = 1024
566
  # The standard I/O file descriptors are redirected to /dev/null by default.
567
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
568
  REDIRECT_TO = constants.LOG_NODESERVER
569
  try:
570
    pid = os.fork()
571
  except OSError, e:
572
    raise Exception("%s [%d]" % (e.strerror, e.errno))
573
  if (pid == 0):  # The first child.
574
    os.setsid()
575
    try:
576
      pid = os.fork() # Fork a second child.
577
    except OSError, e:
578
      raise Exception("%s [%d]" % (e.strerror, e.errno))
579
    if (pid == 0):  # The second child.
580
      os.chdir(WORKDIR)
581
      os.umask(UMASK)
582
    else:
583
      # exit() or _exit()?  See below.
584
      os._exit(0) # Exit parent (the first child) of the second child.
585
  else:
586
    os._exit(0) # Exit parent of the first child.
587
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
588
  if (maxfd == resource.RLIM_INFINITY):
589
    maxfd = MAXFD
590

    
591
  # Iterate through and close all file descriptors.
592
  for fd in range(0, maxfd):
593
    try:
594
      os.close(fd)
595
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
596
      pass
597
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
598
  # Duplicate standard input to standard output and standard error.
599
  os.dup2(0, 1)     # standard output (1)
600
  os.dup2(0, 2)     # standard error (2)
601
  return(0)
602

    
603

    
604
if __name__ == '__main__':
605
  main()