Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 153d9724

History | View | Annotate | Download (16.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41
from ganeti import utils
42

    
43
from twisted.spread import pb
44
from twisted.internet import reactor
45
from twisted.cred import checkers, portal
46
from OpenSSL import SSL
47

    
48

    
49
class ServerContextFactory:
50
  """SSL context factory class that uses a given certificate.
51

    
52
  """
53
  @staticmethod
54
  def getContext():
55
    """Return a customized context.
56

    
57
    The context will be set to use our certificate.
58

    
59
    """
60
    ctx = SSL.Context(SSL.TLSv1_METHOD)
61
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
62
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63
    return ctx
64

    
65
class ServerObject(pb.Avatar):
66
  """The server implementation.
67

    
68
  This class holds all methods exposed over the RPC interface.
69

    
70
  """
71
  def __init__(self, name):
72
    self.name = name
73

    
74
  def perspectiveMessageReceived(self, broker, message, args, kw):
75
    """Custom message dispatching function.
76

    
77
    This function overrides the pb.Avatar function in order to provide
78
    a simple form of exception passing (as text only).
79

    
80
    """
81
    args = broker.unserialize(args, self)
82
    kw = broker.unserialize(kw, self)
83
    method = getattr(self, "perspective_%s" % message)
84
    tb = None
85
    state = None
86
    try:
87
      state = method(*args, **kw)
88
    except:
89
      tb = traceback.format_exc()
90

    
91
    return broker.serialize((tb, state), self, method, args, kw)
92

    
93
  # the new block devices  --------------------------
94

    
95
  @staticmethod
96
  def perspective_blockdev_create(params):
97
    """Create a block device.
98

    
99
    """
100
    bdev_s, size, on_primary, info = params
101
    bdev = objects.Disk.FromDict(bdev_s)
102
    if bdev is None:
103
      raise ValueError("can't unserialize data!")
104
    return backend.CreateBlockDevice(bdev, size, on_primary, info)
105

    
106
  @staticmethod
107
  def perspective_blockdev_remove(params):
108
    """Remove a block device.
109

    
110
    """
111
    bdev_s = params[0]
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    return backend.RemoveBlockDevice(bdev)
114

    
115
  @staticmethod
116
  def perspective_blockdev_assemble(params):
117
    """Assemble a block device.
118

    
119
    """
120
    bdev_s, on_primary = params
121
    bdev = objects.Disk.FromDict(bdev_s)
122
    if bdev is None:
123
      raise ValueError("can't unserialize data!")
124
    return backend.AssembleBlockDevice(bdev, on_primary)
125

    
126
  @staticmethod
127
  def perspective_blockdev_shutdown(params):
128
    """Shutdown a block device.
129

    
130
    """
131
    bdev_s = params[0]
132
    bdev = objects.Disk.FromDict(bdev_s)
133
    if bdev is None:
134
      raise ValueError("can't unserialize data!")
135
    return backend.ShutdownBlockDevice(bdev)
136

    
137
  @staticmethod
138
  def perspective_blockdev_addchildren(params):
139
    """Add a child to a mirror device.
140

    
141
    Note: this is only valid for mirror devices. It's the caller's duty
142
    to send a correct disk, otherwise we raise an error.
143

    
144
    """
145
    bdev_s, ndev_s = params
146
    bdev = objects.Disk.FromDict(bdev_s)
147
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
148
    if bdev is None or ndevs.count(None) > 0:
149
      raise ValueError("can't unserialize data!")
150
    return backend.MirrorAddChildren(bdev, ndevs)
151

    
152
  @staticmethod
153
  def perspective_blockdev_removechildren(params):
154
    """Remove a child from a mirror device.
155

    
156
    This is only valid for mirror devices, of course. It's the callers
157
    duty to send a correct disk, otherwise we raise an error.
158

    
159
    """
160
    bdev_s, ndev_s = params
161
    bdev = objects.Disk.FromDict(bdev_s)
162
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
163
    if bdev is None or ndevs.count(None) > 0:
164
      raise ValueError("can't unserialize data!")
165
    return backend.MirrorRemoveChildren(bdev, ndevs)
166

    
167
  @staticmethod
168
  def perspective_blockdev_getmirrorstatus(params):
169
    """Return the mirror status for a list of disks.
170

    
171
    """
172
    disks = [objects.Disk.FromDict(dsk_s)
173
            for dsk_s in params]
174
    return backend.GetMirrorStatus(disks)
175

    
176
  @staticmethod
177
  def perspective_blockdev_find(params):
178
    """Expose the FindBlockDevice functionality for a disk.
179

    
180
    This will try to find but not activate a disk.
181

    
182
    """
183
    disk = objects.Disk.FromDict(params[0])
184
    return backend.FindBlockDevice(disk)
185

    
186
  @staticmethod
187
  def perspective_blockdev_snapshot(params):
188
    """Create a snapshot device.
189

    
190
    Note that this is only valid for LVM disks, if we get passed
191
    something else we raise an exception. The snapshot device can be
192
    remove by calling the generic block device remove call.
193

    
194
    """
195
    cfbd = objects.Disk.FromDict(params[0])
196
    return backend.SnapshotBlockDevice(cfbd)
197

    
198
  # export/import  --------------------------
199

    
200
  @staticmethod
201
  def perspective_snapshot_export(params):
202
    """Export a given snapshot.
203

    
204
    """
205
    disk = objects.Disk.FromDict(params[0])
206
    dest_node = params[1]
207
    instance = objects.Instance.FromDict(params[2])
208
    return backend.ExportSnapshot(disk, dest_node, instance)
209

    
210
  @staticmethod
211
  def perspective_finalize_export(params):
212
    """Expose the finalize export functionality.
213

    
214
    """
215
    instance = objects.Instance.FromDict(params[0])
216
    snap_disks = [objects.Disk.FromDict(str_data)
217
                  for str_data in params[1]]
218
    return backend.FinalizeExport(instance, snap_disks)
219

    
220
  @staticmethod
221
  def perspective_export_info(params):
222
    """Query information about an existing export on this node.
223

    
224
    The given path may not contain an export, in which case we return
225
    None.
226

    
227
    """
228
    path = params[0]
229
    einfo = backend.ExportInfo(path)
230
    if einfo is None:
231
      return einfo
232
    return einfo.Dumps()
233

    
234
  @staticmethod
235
  def perspective_export_list(params):
236
    """List the available exports on this node.
237

    
238
    Note that as opposed to export_info, which may query data about an
239
    export in any path, this only queries the standard Ganeti path
240
    (constants.EXPORT_DIR).
241

    
242
    """
243
    return backend.ListExports()
244

    
245
  @staticmethod
246
  def perspective_export_remove(params):
247
    """Remove an export.
248

    
249
    """
250
    export = params[0]
251
    return backend.RemoveExport(export)
252

    
253
  # volume  --------------------------
254

    
255
  @staticmethod
256
  def perspective_volume_list(params):
257
    """Query the list of logical volumes in a given volume group.
258

    
259
    """
260
    vgname = params[0]
261
    return backend.GetVolumeList(vgname)
262

    
263
  @staticmethod
264
  def perspective_vg_list(params):
265
    """Query the list of volume groups.
266

    
267
    """
268
    return backend.ListVolumeGroups()
269

    
270
  # bridge  --------------------------
271

    
272
  @staticmethod
273
  def perspective_bridges_exist(params):
274
    """Check if all bridges given exist on this node.
275

    
276
    """
277
    bridges_list = params[0]
278
    return backend.BridgesExist(bridges_list)
279

    
280
  # instance  --------------------------
281

    
282
  @staticmethod
283
  def perspective_instance_os_add(params):
284
    """Install an OS on a given instance.
285

    
286
    """
287
    inst_s, os_disk, swap_disk = params
288
    inst = objects.Instance.FromDict(inst_s)
289
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
290

    
291
  @staticmethod
292
  def perspective_instance_run_rename(params):
293
    """Runs the OS rename script for an instance.
294

    
295
    """
296
    inst_s, old_name, os_disk, swap_disk = params
297
    inst = objects.Instance.FromDict(inst_s)
298
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
299

    
300
  @staticmethod
301
  def perspective_instance_os_import(params):
302
    """Run the import function of an OS onto a given instance.
303

    
304
    """
305
    inst_s, os_disk, swap_disk, src_node, src_image = params
306
    inst = objects.Instance.FromDict(inst_s)
307
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
308
                                        src_node, src_image)
309

    
310
  @staticmethod
311
  def perspective_instance_shutdown(params):
312
    """Shutdown an instance.
313

    
314
    """
315
    instance = objects.Instance.FromDict(params[0])
316
    return backend.ShutdownInstance(instance)
317

    
318
  @staticmethod
319
  def perspective_instance_start(params):
320
    """Start an instance.
321

    
322
    """
323
    instance = objects.Instance.FromDict(params[0])
324
    extra_args = params[1]
325
    return backend.StartInstance(instance, extra_args)
326

    
327
  @staticmethod
328
  def perspective_instance_reboot(params):
329
    """Reboot an instance.
330

    
331
    """
332
    instance = objects.Instance.FromDict(params[0])
333
    reboot_type = params[1]
334
    extra_args = params[2]
335
    return backend.RebootInstance(instance, reboot_type, extra_args)
336

    
337
  @staticmethod
338
  def perspective_instance_info(params):
339
    """Query instance information.
340

    
341
    """
342
    return backend.GetInstanceInfo(params[0])
343

    
344
  @staticmethod
345
  def perspective_all_instances_info(params):
346
    """Query information about all instances.
347

    
348
    """
349
    return backend.GetAllInstancesInfo()
350

    
351
  @staticmethod
352
  def perspective_instance_list(params):
353
    """Query the list of running instances.
354

    
355
    """
356
    return backend.GetInstanceList()
357

    
358
  # node --------------------------
359

    
360
  @staticmethod
361
  def perspective_node_tcp_ping(params):
362
    """Do a TcpPing on the remote node.
363

    
364
    """
365
    return utils.TcpPing(params[0], params[1], params[2],
366
                         timeout=params[3], live_port_needed=params[4])
367

    
368
  @staticmethod
369
  def perspective_node_info(params):
370
    """Query node information.
371

    
372
    """
373
    vgname = params[0]
374
    return backend.GetNodeInfo(vgname)
375

    
376
  @staticmethod
377
  def perspective_node_add(params):
378
    """Complete the registration of this node in the cluster.
379

    
380
    """
381
    return backend.AddNode(params[0], params[1], params[2],
382
                           params[3], params[4], params[5])
383

    
384
  @staticmethod
385
  def perspective_node_verify(params):
386
    """Run a verify sequence on this node.
387

    
388
    """
389
    return backend.VerifyNode(params[0])
390

    
391
  @staticmethod
392
  def perspective_node_start_master(params):
393
    """Promote this node to master status.
394

    
395
    """
396
    return backend.StartMaster()
397

    
398
  @staticmethod
399
  def perspective_node_stop_master(params):
400
    """Demote this node from master status.
401

    
402
    """
403
    return backend.StopMaster()
404

    
405
  @staticmethod
406
  def perspective_node_leave_cluster(params):
407
    """Cleanup after leaving a cluster.
408

    
409
    """
410
    return backend.LeaveCluster()
411

    
412
  @staticmethod
413
  def perspective_node_volumes(params):
414
    """Query the list of all logical volume groups.
415

    
416
    """
417
    return backend.NodeVolumes()
418

    
419
  # cluster --------------------------
420

    
421
  @staticmethod
422
  def perspective_version(params):
423
    """Query version information.
424

    
425
    """
426
    return constants.PROTOCOL_VERSION
427

    
428
  @staticmethod
429
  def perspective_upload_file(params):
430
    """Upload a file.
431

    
432
    Note that the backend implementation imposes strict rules on which
433
    files are accepted.
434

    
435
    """
436
    return backend.UploadFile(*params)
437

    
438

    
439
  # os -----------------------
440

    
441
  @staticmethod
442
  def perspective_os_diagnose(params):
443
    """Query detailed information about existing OSes.
444

    
445
    """
446
    os_list = backend.DiagnoseOS()
447
    if not os_list:
448
      # this catches also return values of 'False',
449
      # for which we can't iterate over
450
      return os_list
451
    result = []
452
    for data in os_list:
453
      if isinstance(data, objects.OS):
454
        result.append(data.ToDict())
455
      elif isinstance(data, errors.InvalidOS):
456
        result.append(data.args)
457
      else:
458
        raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
459
                                     " (class %s, %s)" %
460
                                     (str(data.__class__), data))
461

    
462
    return result
463

    
464
  @staticmethod
465
  def perspective_os_get(params):
466
    """Query information about a given OS.
467

    
468
    """
469
    name = params[0]
470
    try:
471
      os_obj = backend.OSFromDisk(name).ToDict()
472
    except errors.InvalidOS, err:
473
      os_obj = err.args
474
    return os_obj
475

    
476
  # hooks -----------------------
477

    
478
  @staticmethod
479
  def perspective_hooks_runner(params):
480
    """Run hook scripts.
481

    
482
    """
483
    hpath, phase, env = params
484
    hr = backend.HooksRunner()
485
    return hr.RunHooks(hpath, phase, env)
486

    
487

    
488
class MyRealm:
489
  """Simple realm that forwards all requests to a ServerObject.
490

    
491
  """
492
  __implements__ = portal.IRealm
493

    
494
  def requestAvatar(self, avatarId, mind, *interfaces):
495
    """Return an avatar based on our ServerObject class.
496

    
497
    """
498
    if pb.IPerspective not in interfaces:
499
      raise NotImplementedError
500
    return pb.IPerspective, ServerObject(avatarId), lambda:None
501

    
502

    
503
def ParseOptions():
504
  """Parse the command line options.
505

    
506
  Returns:
507
    (options, args) as from OptionParser.parse_args()
508

    
509
  """
510
  parser = OptionParser(description="Ganeti node daemon",
511
                        usage="%prog [-f] [-d]",
512
                        version="%%prog (ganeti) %s" %
513
                        constants.RELEASE_VERSION)
514

    
515
  parser.add_option("-f", "--foreground", dest="fork",
516
                    help="Don't detach from the current terminal",
517
                    default=True, action="store_false")
518
  parser.add_option("-d", "--debug", dest="debug",
519
                    help="Enable some debug messages",
520
                    default=False, action="store_true")
521
  options, args = parser.parse_args()
522
  return options, args
523

    
524

    
525
def main():
526
  """Main function for the node daemon.
527

    
528
  """
529
  options, args = ParseOptions()
530
  for fname in (constants.SSL_CERT_FILE,):
531
    if not os.path.isfile(fname):
532
      print "config %s not there, will not run." % fname
533
      sys.exit(5)
534

    
535
  try:
536
    ss = ssconf.SimpleStore()
537
    port = ss.GetNodeDaemonPort()
538
    pwdata = ss.GetNodeDaemonPassword()
539
  except errors.ConfigurationError, err:
540
    print "Cluster configuration incomplete: '%s'" % str(err)
541
    sys.exit(5)
542

    
543
  # become a daemon
544
  if options.fork:
545
    createDaemon()
546

    
547
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
548
                      program="ganeti-noded")
549

    
550
  p = portal.Portal(MyRealm())
551
  p.registerChecker(
552
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
553
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
554
  reactor.run()
555

    
556

    
557
def createDaemon():
558
  """Detach a process from the controlling terminal and run it in the
559
  background as a daemon.
560

    
561
  """
562
  UMASK = 077
563
  WORKDIR = "/"
564
  # Default maximum for the number of available file descriptors.
565
  if 'SC_OPEN_MAX' in os.sysconf_names:
566
    try:
567
      MAXFD = os.sysconf('SC_OPEN_MAX')
568
      if MAXFD < 0:
569
        MAXFD = 1024
570
    except OSError:
571
      MAXFD = 1024
572
  else:
573
    MAXFD = 1024
574
  # The standard I/O file descriptors are redirected to /dev/null by default.
575
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
576
  REDIRECT_TO = constants.LOG_NODESERVER
577
  try:
578
    pid = os.fork()
579
  except OSError, e:
580
    raise Exception("%s [%d]" % (e.strerror, e.errno))
581
  if (pid == 0):  # The first child.
582
    os.setsid()
583
    try:
584
      pid = os.fork() # Fork a second child.
585
    except OSError, e:
586
      raise Exception("%s [%d]" % (e.strerror, e.errno))
587
    if (pid == 0):  # The second child.
588
      os.chdir(WORKDIR)
589
      os.umask(UMASK)
590
    else:
591
      # exit() or _exit()?  See below.
592
      os._exit(0) # Exit parent (the first child) of the second child.
593
  else:
594
    os._exit(0) # Exit parent of the first child.
595
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
596
  if (maxfd == resource.RLIM_INFINITY):
597
    maxfd = MAXFD
598

    
599
  # Iterate through and close all file descriptors.
600
  for fd in range(0, maxfd):
601
    try:
602
      os.close(fd)
603
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
604
      pass
605
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
606
  # Duplicate standard input to standard output and standard error.
607
  os.dup2(0, 1)     # standard output (1)
608
  os.dup2(0, 2)     # standard error (2)
609
  return(0)
610

    
611

    
612
if __name__ == '__main__':
613
  main()