Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 06009e27

History | View | Annotate | Download (16.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41
from ganeti import utils
42

    
43
from twisted.spread import pb
44
from twisted.internet import reactor
45
from twisted.cred import checkers, portal
46
from OpenSSL import SSL
47

    
48

    
49
class ServerContextFactory:
50
  """SSL context factory class that uses a given certificate.
51

    
52
  """
53
  @staticmethod
54
  def getContext():
55
    """Return a customized context.
56

    
57
    The context will be set to use our certificate.
58

    
59
    """
60
    ctx = SSL.Context(SSL.TLSv1_METHOD)
61
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
62
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63
    return ctx
64

    
65
class ServerObject(pb.Avatar):
66
  """The server implementation.
67

    
68
  This class holds all methods exposed over the RPC interface.
69

    
70
  """
71
  def __init__(self, name):
72
    self.name = name
73

    
74
  def perspectiveMessageReceived(self, broker, message, args, kw):
75
    """Custom message dispatching function.
76

    
77
    This function overrides the pb.Avatar function in order to provide
78
    a simple form of exception passing (as text only).
79

    
80
    """
81
    args = broker.unserialize(args, self)
82
    kw = broker.unserialize(kw, self)
83
    method = getattr(self, "perspective_%s" % message)
84
    tb = None
85
    state = None
86
    try:
87
      state = method(*args, **kw)
88
    except:
89
      tb = traceback.format_exc()
90

    
91
    return broker.serialize((tb, state), self, method, args, kw)
92

    
93
  # the new block devices  --------------------------
94

    
95
  @staticmethod
96
  def perspective_blockdev_create(params):
97
    """Create a block device.
98

    
99
    """
100
    bdev_s, size, owner, on_primary, info = params
101
    bdev = objects.Disk.FromDict(bdev_s)
102
    if bdev is None:
103
      raise ValueError("can't unserialize data!")
104
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
105

    
106
  @staticmethod
107
  def perspective_blockdev_remove(params):
108
    """Remove a block device.
109

    
110
    """
111
    bdev_s = params[0]
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    return backend.RemoveBlockDevice(bdev)
114

    
115
  @staticmethod
116
  def perspective_blockdev_rename(params):
117
    """Remove a block device.
118

    
119
    """
120
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121
    return backend.RenameBlockDevices(devlist)
122

    
123
  @staticmethod
124
  def perspective_blockdev_assemble(params):
125
    """Assemble a block device.
126

    
127
    """
128
    bdev_s, owner, on_primary = params
129
    bdev = objects.Disk.FromDict(bdev_s)
130
    if bdev is None:
131
      raise ValueError("can't unserialize data!")
132
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
133

    
134
  @staticmethod
135
  def perspective_blockdev_shutdown(params):
136
    """Shutdown a block device.
137

    
138
    """
139
    bdev_s = params[0]
140
    bdev = objects.Disk.FromDict(bdev_s)
141
    if bdev is None:
142
      raise ValueError("can't unserialize data!")
143
    return backend.ShutdownBlockDevice(bdev)
144

    
145
  @staticmethod
146
  def perspective_blockdev_addchildren(params):
147
    """Add a child to a mirror device.
148

    
149
    Note: this is only valid for mirror devices. It's the caller's duty
150
    to send a correct disk, otherwise we raise an error.
151

    
152
    """
153
    bdev_s, ndev_s = params
154
    bdev = objects.Disk.FromDict(bdev_s)
155
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156
    if bdev is None or ndevs.count(None) > 0:
157
      raise ValueError("can't unserialize data!")
158
    return backend.MirrorAddChildren(bdev, ndevs)
159

    
160
  @staticmethod
161
  def perspective_blockdev_removechildren(params):
162
    """Remove a child from a mirror device.
163

    
164
    This is only valid for mirror devices, of course. It's the callers
165
    duty to send a correct disk, otherwise we raise an error.
166

    
167
    """
168
    bdev_s, ndev_s = params
169
    bdev = objects.Disk.FromDict(bdev_s)
170
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171
    if bdev is None or ndevs.count(None) > 0:
172
      raise ValueError("can't unserialize data!")
173
    return backend.MirrorRemoveChildren(bdev, ndevs)
174

    
175
  @staticmethod
176
  def perspective_blockdev_getmirrorstatus(params):
177
    """Return the mirror status for a list of disks.
178

    
179
    """
180
    disks = [objects.Disk.FromDict(dsk_s)
181
            for dsk_s in params]
182
    return backend.GetMirrorStatus(disks)
183

    
184
  @staticmethod
185
  def perspective_blockdev_find(params):
186
    """Expose the FindBlockDevice functionality for a disk.
187

    
188
    This will try to find but not activate a disk.
189

    
190
    """
191
    disk = objects.Disk.FromDict(params[0])
192
    return backend.FindBlockDevice(disk)
193

    
194
  @staticmethod
195
  def perspective_blockdev_snapshot(params):
196
    """Create a snapshot device.
197

    
198
    Note that this is only valid for LVM disks, if we get passed
199
    something else we raise an exception. The snapshot device can be
200
    remove by calling the generic block device remove call.
201

    
202
    """
203
    cfbd = objects.Disk.FromDict(params[0])
204
    return backend.SnapshotBlockDevice(cfbd)
205

    
206
  # export/import  --------------------------
207

    
208
  @staticmethod
209
  def perspective_snapshot_export(params):
210
    """Export a given snapshot.
211

    
212
    """
213
    disk = objects.Disk.FromDict(params[0])
214
    dest_node = params[1]
215
    instance = objects.Instance.FromDict(params[2])
216
    return backend.ExportSnapshot(disk, dest_node, instance)
217

    
218
  @staticmethod
219
  def perspective_finalize_export(params):
220
    """Expose the finalize export functionality.
221

    
222
    """
223
    instance = objects.Instance.FromDict(params[0])
224
    snap_disks = [objects.Disk.FromDict(str_data)
225
                  for str_data in params[1]]
226
    return backend.FinalizeExport(instance, snap_disks)
227

    
228
  @staticmethod
229
  def perspective_export_info(params):
230
    """Query information about an existing export on this node.
231

    
232
    The given path may not contain an export, in which case we return
233
    None.
234

    
235
    """
236
    path = params[0]
237
    einfo = backend.ExportInfo(path)
238
    if einfo is None:
239
      return einfo
240
    return einfo.Dumps()
241

    
242
  @staticmethod
243
  def perspective_export_list(params):
244
    """List the available exports on this node.
245

    
246
    Note that as opposed to export_info, which may query data about an
247
    export in any path, this only queries the standard Ganeti path
248
    (constants.EXPORT_DIR).
249

    
250
    """
251
    return backend.ListExports()
252

    
253
  @staticmethod
254
  def perspective_export_remove(params):
255
    """Remove an export.
256

    
257
    """
258
    export = params[0]
259
    return backend.RemoveExport(export)
260

    
261
  # volume  --------------------------
262

    
263
  @staticmethod
264
  def perspective_volume_list(params):
265
    """Query the list of logical volumes in a given volume group.
266

    
267
    """
268
    vgname = params[0]
269
    return backend.GetVolumeList(vgname)
270

    
271
  @staticmethod
272
  def perspective_vg_list(params):
273
    """Query the list of volume groups.
274

    
275
    """
276
    return backend.ListVolumeGroups()
277

    
278
  # bridge  --------------------------
279

    
280
  @staticmethod
281
  def perspective_bridges_exist(params):
282
    """Check if all bridges given exist on this node.
283

    
284
    """
285
    bridges_list = params[0]
286
    return backend.BridgesExist(bridges_list)
287

    
288
  # instance  --------------------------
289

    
290
  @staticmethod
291
  def perspective_instance_os_add(params):
292
    """Install an OS on a given instance.
293

    
294
    """
295
    inst_s, os_disk, swap_disk = params
296
    inst = objects.Instance.FromDict(inst_s)
297
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
298

    
299
  @staticmethod
300
  def perspective_instance_run_rename(params):
301
    """Runs the OS rename script for an instance.
302

    
303
    """
304
    inst_s, old_name, os_disk, swap_disk = params
305
    inst = objects.Instance.FromDict(inst_s)
306
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307

    
308
  @staticmethod
309
  def perspective_instance_os_import(params):
310
    """Run the import function of an OS onto a given instance.
311

    
312
    """
313
    inst_s, os_disk, swap_disk, src_node, src_image = params
314
    inst = objects.Instance.FromDict(inst_s)
315
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
316
                                        src_node, src_image)
317

    
318
  @staticmethod
319
  def perspective_instance_shutdown(params):
320
    """Shutdown an instance.
321

    
322
    """
323
    instance = objects.Instance.FromDict(params[0])
324
    return backend.ShutdownInstance(instance)
325

    
326
  @staticmethod
327
  def perspective_instance_start(params):
328
    """Start an instance.
329

    
330
    """
331
    instance = objects.Instance.FromDict(params[0])
332
    extra_args = params[1]
333
    return backend.StartInstance(instance, extra_args)
334

    
335
  @staticmethod
336
  def perspective_instance_reboot(params):
337
    """Reboot an instance.
338

    
339
    """
340
    instance = objects.Instance.FromDict(params[0])
341
    reboot_type = params[1]
342
    extra_args = params[2]
343
    return backend.RebootInstance(instance, reboot_type, extra_args)
344

    
345
  @staticmethod
346
  def perspective_instance_info(params):
347
    """Query instance information.
348

    
349
    """
350
    return backend.GetInstanceInfo(params[0])
351

    
352
  @staticmethod
353
  def perspective_all_instances_info(params):
354
    """Query information about all instances.
355

    
356
    """
357
    return backend.GetAllInstancesInfo()
358

    
359
  @staticmethod
360
  def perspective_instance_list(params):
361
    """Query the list of running instances.
362

    
363
    """
364
    return backend.GetInstanceList()
365

    
366
  # node --------------------------
367

    
368
  @staticmethod
369
  def perspective_node_tcp_ping(params):
370
    """Do a TcpPing on the remote node.
371

    
372
    """
373
    return utils.TcpPing(params[0], params[1], params[2],
374
                         timeout=params[3], live_port_needed=params[4])
375

    
376
  @staticmethod
377
  def perspective_node_info(params):
378
    """Query node information.
379

    
380
    """
381
    vgname = params[0]
382
    return backend.GetNodeInfo(vgname)
383

    
384
  @staticmethod
385
  def perspective_node_add(params):
386
    """Complete the registration of this node in the cluster.
387

    
388
    """
389
    return backend.AddNode(params[0], params[1], params[2],
390
                           params[3], params[4], params[5])
391

    
392
  @staticmethod
393
  def perspective_node_verify(params):
394
    """Run a verify sequence on this node.
395

    
396
    """
397
    return backend.VerifyNode(params[0])
398

    
399
  @staticmethod
400
  def perspective_node_start_master(params):
401
    """Promote this node to master status.
402

    
403
    """
404
    return backend.StartMaster()
405

    
406
  @staticmethod
407
  def perspective_node_stop_master(params):
408
    """Demote this node from master status.
409

    
410
    """
411
    return backend.StopMaster()
412

    
413
  @staticmethod
414
  def perspective_node_leave_cluster(params):
415
    """Cleanup after leaving a cluster.
416

    
417
    """
418
    return backend.LeaveCluster()
419

    
420
  @staticmethod
421
  def perspective_node_volumes(params):
422
    """Query the list of all logical volume groups.
423

    
424
    """
425
    return backend.NodeVolumes()
426

    
427
  # cluster --------------------------
428

    
429
  @staticmethod
430
  def perspective_version(params):
431
    """Query version information.
432

    
433
    """
434
    return constants.PROTOCOL_VERSION
435

    
436
  @staticmethod
437
  def perspective_upload_file(params):
438
    """Upload a file.
439

    
440
    Note that the backend implementation imposes strict rules on which
441
    files are accepted.
442

    
443
    """
444
    return backend.UploadFile(*params)
445

    
446

    
447
  # os -----------------------
448

    
449
  @staticmethod
450
  def perspective_os_diagnose(params):
451
    """Query detailed information about existing OSes.
452

    
453
    """
454
    return [os.ToDict() for os in backend.DiagnoseOS()]
455

    
456
  @staticmethod
457
  def perspective_os_get(params):
458
    """Query information about a given OS.
459

    
460
    """
461
    name = params[0]
462
    try:
463
      os_obj = backend.OSFromDisk(name)
464
    except errors.InvalidOS, err:
465
      os_obj = objects.OS.FromInvalidOS(err)
466
    return os_obj.ToDict()
467

    
468
  # hooks -----------------------
469

    
470
  @staticmethod
471
  def perspective_hooks_runner(params):
472
    """Run hook scripts.
473

    
474
    """
475
    hpath, phase, env = params
476
    hr = backend.HooksRunner()
477
    return hr.RunHooks(hpath, phase, env)
478

    
479
  # test -----------------------
480

    
481
  @staticmethod
482
  def perspective_test_delay(params):
483
    """Run test delay.
484

    
485
    """
486
    duration = params[0]
487
    return utils.TestDelay(duration)
488

    
489

    
490
class MyRealm:
491
  """Simple realm that forwards all requests to a ServerObject.
492

    
493
  """
494
  __implements__ = portal.IRealm
495

    
496
  def requestAvatar(self, avatarId, mind, *interfaces):
497
    """Return an avatar based on our ServerObject class.
498

    
499
    """
500
    if pb.IPerspective not in interfaces:
501
      raise NotImplementedError
502
    return pb.IPerspective, ServerObject(avatarId), lambda:None
503

    
504

    
505
def ParseOptions():
506
  """Parse the command line options.
507

    
508
  Returns:
509
    (options, args) as from OptionParser.parse_args()
510

    
511
  """
512
  parser = OptionParser(description="Ganeti node daemon",
513
                        usage="%prog [-f] [-d]",
514
                        version="%%prog (ganeti) %s" %
515
                        constants.RELEASE_VERSION)
516

    
517
  parser.add_option("-f", "--foreground", dest="fork",
518
                    help="Don't detach from the current terminal",
519
                    default=True, action="store_false")
520
  parser.add_option("-d", "--debug", dest="debug",
521
                    help="Enable some debug messages",
522
                    default=False, action="store_true")
523
  options, args = parser.parse_args()
524
  return options, args
525

    
526

    
527
def main():
528
  """Main function for the node daemon.
529

    
530
  """
531
  options, args = ParseOptions()
532
  utils.debug = options.debug
533
  for fname in (constants.SSL_CERT_FILE,):
534
    if not os.path.isfile(fname):
535
      print "config %s not there, will not run." % fname
536
      sys.exit(5)
537

    
538
  try:
539
    ss = ssconf.SimpleStore()
540
    port = ss.GetNodeDaemonPort()
541
    pwdata = ss.GetNodeDaemonPassword()
542
  except errors.ConfigurationError, err:
543
    print "Cluster configuration incomplete: '%s'" % str(err)
544
    sys.exit(5)
545

    
546
  # become a daemon
547
  if options.fork:
548
    createDaemon()
549

    
550
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
551
                      program="ganeti-noded")
552

    
553
  p = portal.Portal(MyRealm())
554
  p.registerChecker(
555
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
556
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
557
  reactor.run()
558

    
559

    
560
def createDaemon():
561
  """Detach a process from the controlling terminal and run it in the
562
  background as a daemon.
563

    
564
  """
565
  UMASK = 077
566
  WORKDIR = "/"
567
  # Default maximum for the number of available file descriptors.
568
  if 'SC_OPEN_MAX' in os.sysconf_names:
569
    try:
570
      MAXFD = os.sysconf('SC_OPEN_MAX')
571
      if MAXFD < 0:
572
        MAXFD = 1024
573
    except OSError:
574
      MAXFD = 1024
575
  else:
576
    MAXFD = 1024
577
  # The standard I/O file descriptors are redirected to /dev/null by default.
578
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
579
  REDIRECT_TO = constants.LOG_NODESERVER
580
  try:
581
    pid = os.fork()
582
  except OSError, e:
583
    raise Exception("%s [%d]" % (e.strerror, e.errno))
584
  if (pid == 0):  # The first child.
585
    os.setsid()
586
    try:
587
      pid = os.fork() # Fork a second child.
588
    except OSError, e:
589
      raise Exception("%s [%d]" % (e.strerror, e.errno))
590
    if (pid == 0):  # The second child.
591
      os.chdir(WORKDIR)
592
      os.umask(UMASK)
593
    else:
594
      # exit() or _exit()?  See below.
595
      os._exit(0) # Exit parent (the first child) of the second child.
596
  else:
597
    os._exit(0) # Exit parent of the first child.
598
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
599
  if (maxfd == resource.RLIM_INFINITY):
600
    maxfd = MAXFD
601

    
602
  # Iterate through and close all file descriptors.
603
  for fd in range(0, maxfd):
604
    try:
605
      os.close(fd)
606
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
607
      pass
608
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
609
  # Duplicate standard input to standard output and standard error.
610
  os.dup2(0, 1)     # standard output (1)
611
  os.dup2(0, 2)     # standard error (2)
612
  return(0)
613

    
614

    
615
if __name__ == '__main__':
616
  main()