Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ b50f022f

History | View | Annotate | Download (15.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41
from ganeti import utils
42

    
43
from twisted.spread import pb
44
from twisted.internet import reactor
45
from twisted.cred import checkers, portal
46
from OpenSSL import SSL
47

    
48

    
49
class ServerContextFactory:
50
  """SSL context factory class that uses a given certificate.
51

    
52
  """
53
  @staticmethod
54
  def getContext():
55
    """Return a customized context.
56

    
57
    The context will be set to use our certificate.
58

    
59
    """
60
    ctx = SSL.Context(SSL.TLSv1_METHOD)
61
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
62
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63
    return ctx
64

    
65
class ServerObject(pb.Avatar):
66
  """The server implementation.
67

    
68
  This class holds all methods exposed over the RPC interface.
69

    
70
  """
71
  def __init__(self, name):
72
    self.name = name
73

    
74
  def perspectiveMessageReceived(self, broker, message, args, kw):
75
    """Custom message dispatching function.
76

    
77
    This function overrides the pb.Avatar function in order to provide
78
    a simple form of exception passing (as text only).
79

    
80
    """
81
    args = broker.unserialize(args, self)
82
    kw = broker.unserialize(kw, self)
83
    method = getattr(self, "perspective_%s" % message)
84
    tb = None
85
    state = None
86
    try:
87
      state = method(*args, **kw)
88
    except:
89
      tb = traceback.format_exc()
90

    
91
    return broker.serialize((tb, state), self, method, args, kw)
92

    
93
  # the new block devices  --------------------------
94

    
95
  @staticmethod
96
  def perspective_blockdev_create(params):
97
    """Create a block device.
98

    
99
    """
100
    bdev_s, size, on_primary, info = params
101
    bdev = objects.Disk.FromDict(bdev_s)
102
    if bdev is None:
103
      raise ValueError("can't unserialize data!")
104
    return backend.CreateBlockDevice(bdev, size, on_primary, info)
105

    
106
  @staticmethod
107
  def perspective_blockdev_remove(params):
108
    """Remove a block device.
109

    
110
    """
111
    bdev_s = params[0]
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    return backend.RemoveBlockDevice(bdev)
114

    
115
  @staticmethod
116
  def perspective_blockdev_assemble(params):
117
    """Assemble a block device.
118

    
119
    """
120
    bdev_s, on_primary = params
121
    bdev = objects.Disk.FromDict(bdev_s)
122
    if bdev is None:
123
      raise ValueError("can't unserialize data!")
124
    return backend.AssembleBlockDevice(bdev, on_primary)
125

    
126
  @staticmethod
127
  def perspective_blockdev_shutdown(params):
128
    """Shutdown a block device.
129

    
130
    """
131
    bdev_s = params[0]
132
    bdev = objects.Disk.FromDict(bdev_s)
133
    if bdev is None:
134
      raise ValueError("can't unserialize data!")
135
    return backend.ShutdownBlockDevice(bdev)
136

    
137
  @staticmethod
138
  def perspective_blockdev_addchild(params):
139
    """Add a child to a mirror device.
140

    
141
    Note: this is only valid for mirror devices. It's the caller's duty
142
    to send a correct disk, otherwise we raise an error.
143

    
144
    """
145
    bdev_s, ndev_s = params
146
    bdev = objects.Disk.FromDict(bdev_s)
147
    ndev = objects.Disk.FromDict(ndev_s)
148
    if bdev is None or ndev is None:
149
      raise ValueError("can't unserialize data!")
150
    return backend.MirrorAddChild(bdev, ndev)
151

    
152
  @staticmethod
153
  def perspective_blockdev_removechild(params):
154
    """Remove a child from a mirror device.
155

    
156
    This is only valid for mirror devices, of course. It's the callers
157
    duty to send a correct disk, otherwise we raise an error.
158

    
159
    """
160
    bdev_s, ndev_s = params
161
    bdev = objects.Disk.FromDict(bdev_s)
162
    ndev = objects.Disk.FromDict(ndev_s)
163
    if bdev is None or ndev is None:
164
      raise ValueError("can't unserialize data!")
165
    return backend.MirrorRemoveChild(bdev, ndev)
166

    
167
  @staticmethod
168
  def perspective_blockdev_getmirrorstatus(params):
169
    """Return the mirror status for a list of disks.
170

    
171
    """
172
    disks = [objects.Disk.FromDict(dsk_s)
173
            for dsk_s in params]
174
    return backend.GetMirrorStatus(disks)
175

    
176
  @staticmethod
177
  def perspective_blockdev_find(params):
178
    """Expose the FindBlockDevice functionality for a disk.
179

    
180
    This will try to find but not activate a disk.
181

    
182
    """
183
    disk = objects.Disk.FromDict(params[0])
184
    return backend.FindBlockDevice(disk)
185

    
186
  @staticmethod
187
  def perspective_blockdev_snapshot(params):
188
    """Create a snapshot device.
189

    
190
    Note that this is only valid for LVM disks, if we get passed
191
    something else we raise an exception. The snapshot device can be
192
    remove by calling the generic block device remove call.
193

    
194
    """
195
    cfbd = objects.Disk.FromDict(params[0])
196
    return backend.SnapshotBlockDevice(cfbd)
197

    
198
  # export/import  --------------------------
199

    
200
  @staticmethod
201
  def perspective_snapshot_export(params):
202
    """Export a given snapshot.
203

    
204
    """
205
    disk = objects.Disk.FromDict(params[0])
206
    dest_node = params[1]
207
    instance = objects.Instance.FromDict(params[2])
208
    return backend.ExportSnapshot(disk, dest_node, instance)
209

    
210
  @staticmethod
211
  def perspective_finalize_export(params):
212
    """Expose the finalize export functionality.
213

    
214
    """
215
    instance = objects.Instance.FromDict(params[0])
216
    snap_disks = [objects.Disk.FromDict(str_data)
217
                  for str_data in params[1]]
218
    return backend.FinalizeExport(instance, snap_disks)
219

    
220
  @staticmethod
221
  def perspective_export_info(params):
222
    """Query information about an existing export on this node.
223

    
224
    The given path may not contain an export, in which case we return
225
    None.
226

    
227
    """
228
    path = params[0]
229
    einfo = backend.ExportInfo(path)
230
    if einfo is None:
231
      return einfo
232
    return einfo.Dumps()
233

    
234
  @staticmethod
235
  def perspective_export_list(params):
236
    """List the available exports on this node.
237

    
238
    Note that as opposed to export_info, which may query data about an
239
    export in any path, this only queries the standard Ganeti path
240
    (constants.EXPORT_DIR).
241

    
242
    """
243
    return backend.ListExports()
244

    
245
  @staticmethod
246
  def perspective_export_remove(params):
247
    """Remove an export.
248

    
249
    """
250
    export = params[0]
251
    return backend.RemoveExport(export)
252

    
253
  # volume  --------------------------
254

    
255
  @staticmethod
256
  def perspective_volume_list(params):
257
    """Query the list of logical volumes in a given volume group.
258

    
259
    """
260
    vgname = params[0]
261
    return backend.GetVolumeList(vgname)
262

    
263
  @staticmethod
264
  def perspective_vg_list(params):
265
    """Query the list of volume groups.
266

    
267
    """
268
    return backend.ListVolumeGroups()
269

    
270
  # bridge  --------------------------
271

    
272
  @staticmethod
273
  def perspective_bridges_exist(params):
274
    """Check if all bridges given exist on this node.
275

    
276
    """
277
    bridges_list = params[0]
278
    return backend.BridgesExist(bridges_list)
279

    
280
  # instance  --------------------------
281

    
282
  @staticmethod
283
  def perspective_instance_os_add(params):
284
    """Install an OS on a given instance.
285

    
286
    """
287
    inst_s, os_disk, swap_disk = params
288
    inst = objects.Instance.FromDict(inst_s)
289
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
290

    
291
  @staticmethod
292
  def perspective_instance_run_rename(params):
293
    """Runs the OS rename script for an instance.
294

    
295
    """
296
    inst_s, old_name, os_disk, swap_disk = params
297
    inst = objects.Instance.FromDict(inst_s)
298
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
299

    
300
  @staticmethod
301
  def perspective_instance_os_import(params):
302
    """Run the import function of an OS onto a given instance.
303

    
304
    """
305
    inst_s, os_disk, swap_disk, src_node, src_image = params
306
    inst = objects.Instance.FromDict(inst_s)
307
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
308
                                        src_node, src_image)
309

    
310
  @staticmethod
311
  def perspective_instance_shutdown(params):
312
    """Shutdown an instance.
313

    
314
    """
315
    instance = objects.Instance.FromDict(params[0])
316
    return backend.ShutdownInstance(instance)
317

    
318
  @staticmethod
319
  def perspective_instance_start(params):
320
    """Start an instance.
321

    
322
    """
323
    instance = objects.Instance.FromDict(params[0])
324
    extra_args = params[1]
325
    return backend.StartInstance(instance, extra_args)
326

    
327
  @staticmethod
328
  def perspective_instance_info(params):
329
    """Query instance information.
330

    
331
    """
332
    return backend.GetInstanceInfo(params[0])
333

    
334
  @staticmethod
335
  def perspective_all_instances_info(params):
336
    """Query information about all instances.
337

    
338
    """
339
    return backend.GetAllInstancesInfo()
340

    
341
  @staticmethod
342
  def perspective_instance_list(params):
343
    """Query the list of running instances.
344

    
345
    """
346
    return backend.GetInstanceList()
347

    
348
  # node --------------------------
349

    
350
  @staticmethod
351
  def perspective_node_tcp_ping(params):
352
    """Do a TcpPing on the remote node.
353

    
354
    """
355
    return utils.TcpPing(params[0], params[1], params[2],
356
                         timeout=params[3], live_port_needed=params[4])
357

    
358
  @staticmethod
359
  def perspective_node_info(params):
360
    """Query node information.
361

    
362
    """
363
    vgname = params[0]
364
    return backend.GetNodeInfo(vgname)
365

    
366
  @staticmethod
367
  def perspective_node_add(params):
368
    """Complete the registration of this node in the cluster.
369

    
370
    """
371
    return backend.AddNode(params[0], params[1], params[2],
372
                           params[3], params[4], params[5])
373

    
374
  @staticmethod
375
  def perspective_node_verify(params):
376
    """Run a verify sequence on this node.
377

    
378
    """
379
    return backend.VerifyNode(params[0])
380

    
381
  @staticmethod
382
  def perspective_node_start_master(params):
383
    """Promote this node to master status.
384

    
385
    """
386
    return backend.StartMaster()
387

    
388
  @staticmethod
389
  def perspective_node_stop_master(params):
390
    """Demote this node from master status.
391

    
392
    """
393
    return backend.StopMaster()
394

    
395
  @staticmethod
396
  def perspective_node_leave_cluster(params):
397
    """Cleanup after leaving a cluster.
398

    
399
    """
400
    return backend.LeaveCluster()
401

    
402
  @staticmethod
403
  def perspective_node_volumes(params):
404
    """Query the list of all logical volume groups.
405

    
406
    """
407
    return backend.NodeVolumes()
408

    
409
  # cluster --------------------------
410

    
411
  @staticmethod
412
  def perspective_version(params):
413
    """Query version information.
414

    
415
    """
416
    return constants.PROTOCOL_VERSION
417

    
418
  @staticmethod
419
  def perspective_upload_file(params):
420
    """Upload a file.
421

    
422
    Note that the backend implementation imposes strict rules on which
423
    files are accepted.
424

    
425
    """
426
    return backend.UploadFile(*params)
427

    
428

    
429
  # os -----------------------
430

    
431
  @staticmethod
432
  def perspective_os_diagnose(params):
433
    """Query detailed information about existing OSes.
434

    
435
    """
436
    os_list = backend.DiagnoseOS()
437
    if not os_list:
438
      # this catches also return values of 'False',
439
      # for which we can't iterate over
440
      return os_list
441
    result = []
442
    for data in os_list:
443
      if isinstance(data, objects.OS):
444
        result.append(data.ToDict())
445
      elif isinstance(data, errors.InvalidOS):
446
        result.append(data.args)
447
      else:
448
        raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
449
                                     " (class %s, %s)" %
450
                                     (str(data.__class__), data))
451

    
452
    return result
453

    
454
  @staticmethod
455
  def perspective_os_get(params):
456
    """Query information about a given OS.
457

    
458
    """
459
    name = params[0]
460
    try:
461
      os_obj = backend.OSFromDisk(name).ToDict()
462
    except errors.InvalidOS, err:
463
      os_obj = err.args
464
    return os_obj
465

    
466
  # hooks -----------------------
467

    
468
  @staticmethod
469
  def perspective_hooks_runner(params):
470
    """Run hook scripts.
471

    
472
    """
473
    hpath, phase, env = params
474
    hr = backend.HooksRunner()
475
    return hr.RunHooks(hpath, phase, env)
476

    
477

    
478
class MyRealm:
479
  """Simple realm that forwards all requests to a ServerObject.
480

    
481
  """
482
  __implements__ = portal.IRealm
483

    
484
  def requestAvatar(self, avatarId, mind, *interfaces):
485
    """Return an avatar based on our ServerObject class.
486

    
487
    """
488
    if pb.IPerspective not in interfaces:
489
      raise NotImplementedError
490
    return pb.IPerspective, ServerObject(avatarId), lambda:None
491

    
492

    
493
def ParseOptions():
494
  """Parse the command line options.
495

    
496
  Returns:
497
    (options, args) as from OptionParser.parse_args()
498

    
499
  """
500
  parser = OptionParser(description="Ganeti node daemon",
501
                        usage="%prog [-f] [-d]",
502
                        version="%%prog (ganeti) %s" %
503
                        constants.RELEASE_VERSION)
504

    
505
  parser.add_option("-f", "--foreground", dest="fork",
506
                    help="Don't detach from the current terminal",
507
                    default=True, action="store_false")
508
  parser.add_option("-d", "--debug", dest="debug",
509
                    help="Enable some debug messages",
510
                    default=False, action="store_true")
511
  options, args = parser.parse_args()
512
  return options, args
513

    
514

    
515
def main():
516
  """Main function for the node daemon.
517

    
518
  """
519
  options, args = ParseOptions()
520
  for fname in (constants.SSL_CERT_FILE,):
521
    if not os.path.isfile(fname):
522
      print "config %s not there, will not run." % fname
523
      sys.exit(5)
524

    
525
  try:
526
    ss = ssconf.SimpleStore()
527
    port = ss.GetNodeDaemonPort()
528
    pwdata = ss.GetNodeDaemonPassword()
529
  except errors.ConfigurationError, err:
530
    print "Cluster configuration incomplete: '%s'" % str(err)
531
    sys.exit(5)
532

    
533
  # become a daemon
534
  if options.fork:
535
    createDaemon()
536

    
537
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
538
                      program="ganeti-noded")
539

    
540
  p = portal.Portal(MyRealm())
541
  p.registerChecker(
542
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
543
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
544
  reactor.run()
545

    
546

    
547
def createDaemon():
548
  """Detach a process from the controlling terminal and run it in the
549
  background as a daemon.
550

    
551
  """
552
  UMASK = 077
553
  WORKDIR = "/"
554
  # Default maximum for the number of available file descriptors.
555
  if 'SC_OPEN_MAX' in os.sysconf_names:
556
    try:
557
      MAXFD = os.sysconf('SC_OPEN_MAX')
558
      if MAXFD < 0:
559
        MAXFD = 1024
560
    except OSError:
561
      MAXFD = 1024
562
  else:
563
    MAXFD = 1024
564
  # The standard I/O file descriptors are redirected to /dev/null by default.
565
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
566
  REDIRECT_TO = constants.LOG_NODESERVER
567
  try:
568
    pid = os.fork()
569
  except OSError, e:
570
    raise Exception("%s [%d]" % (e.strerror, e.errno))
571
  if (pid == 0):  # The first child.
572
    os.setsid()
573
    try:
574
      pid = os.fork() # Fork a second child.
575
    except OSError, e:
576
      raise Exception("%s [%d]" % (e.strerror, e.errno))
577
    if (pid == 0):  # The second child.
578
      os.chdir(WORKDIR)
579
      os.umask(UMASK)
580
    else:
581
      # exit() or _exit()?  See below.
582
      os._exit(0) # Exit parent (the first child) of the second child.
583
  else:
584
    os._exit(0) # Exit parent of the first child.
585
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
586
  if (maxfd == resource.RLIM_INFINITY):
587
    maxfd = MAXFD
588

    
589
  # Iterate through and close all file descriptors.
590
  for fd in range(0, maxfd):
591
    try:
592
      os.close(fd)
593
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
594
      pass
595
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
596
  # Duplicate standard input to standard output and standard error.
597
  os.dup2(0, 1)     # standard output (1)
598
  os.dup2(0, 2)     # standard error (2)
599
  return(0)
600

    
601

    
602
if __name__ == '__main__':
603
  main()