Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ f3e513ad

History | View | Annotate | Download (16.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31

    
32
from optparse import OptionParser
33

    
34

    
35
from ganeti import backend
36
from ganeti import logger
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import errors
40
from ganeti import ssconf
41
from ganeti import utils
42

    
43
from twisted.spread import pb
44
from twisted.internet import reactor
45
from twisted.cred import checkers, portal
46
from OpenSSL import SSL
47

    
48

    
49
class ServerContextFactory:
50
  """SSL context factory class that uses a given certificate.
51

    
52
  """
53
  @staticmethod
54
  def getContext():
55
    """Return a customized context.
56

    
57
    The context will be set to use our certificate.
58

    
59
    """
60
    ctx = SSL.Context(SSL.TLSv1_METHOD)
61
    ctx.use_certificate_file(constants.SSL_CERT_FILE)
62
    ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63
    return ctx
64

    
65
class ServerObject(pb.Avatar):
66
  """The server implementation.
67

    
68
  This class holds all methods exposed over the RPC interface.
69

    
70
  """
71
  def __init__(self, name):
72
    self.name = name
73

    
74
  def perspectiveMessageReceived(self, broker, message, args, kw):
75
    """Custom message dispatching function.
76

    
77
    This function overrides the pb.Avatar function in order to provide
78
    a simple form of exception passing (as text only).
79

    
80
    """
81
    args = broker.unserialize(args, self)
82
    kw = broker.unserialize(kw, self)
83
    method = getattr(self, "perspective_%s" % message)
84
    tb = None
85
    state = None
86
    try:
87
      state = method(*args, **kw)
88
    except:
89
      tb = traceback.format_exc()
90

    
91
    return broker.serialize((tb, state), self, method, args, kw)
92

    
93
  # the new block devices  --------------------------
94

    
95
  @staticmethod
96
  def perspective_blockdev_create(params):
97
    """Create a block device.
98

    
99
    """
100
    bdev_s, size, on_primary, info = params
101
    bdev = objects.Disk.FromDict(bdev_s)
102
    if bdev is None:
103
      raise ValueError("can't unserialize data!")
104
    return backend.CreateBlockDevice(bdev, size, on_primary, info)
105

    
106
  @staticmethod
107
  def perspective_blockdev_remove(params):
108
    """Remove a block device.
109

    
110
    """
111
    bdev_s = params[0]
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    return backend.RemoveBlockDevice(bdev)
114

    
115
  @staticmethod
116
  def perspective_blockdev_rename(params):
117
    """Remove a block device.
118

    
119
    """
120
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121
    return backend.RenameBlockDevices(devlist)
122

    
123
  @staticmethod
124
  def perspective_blockdev_assemble(params):
125
    """Assemble a block device.
126

    
127
    """
128
    bdev_s, on_primary = params
129
    bdev = objects.Disk.FromDict(bdev_s)
130
    if bdev is None:
131
      raise ValueError("can't unserialize data!")
132
    return backend.AssembleBlockDevice(bdev, on_primary)
133

    
134
  @staticmethod
135
  def perspective_blockdev_shutdown(params):
136
    """Shutdown a block device.
137

    
138
    """
139
    bdev_s = params[0]
140
    bdev = objects.Disk.FromDict(bdev_s)
141
    if bdev is None:
142
      raise ValueError("can't unserialize data!")
143
    return backend.ShutdownBlockDevice(bdev)
144

    
145
  @staticmethod
146
  def perspective_blockdev_addchildren(params):
147
    """Add a child to a mirror device.
148

    
149
    Note: this is only valid for mirror devices. It's the caller's duty
150
    to send a correct disk, otherwise we raise an error.
151

    
152
    """
153
    bdev_s, ndev_s = params
154
    bdev = objects.Disk.FromDict(bdev_s)
155
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156
    if bdev is None or ndevs.count(None) > 0:
157
      raise ValueError("can't unserialize data!")
158
    return backend.MirrorAddChildren(bdev, ndevs)
159

    
160
  @staticmethod
161
  def perspective_blockdev_removechildren(params):
162
    """Remove a child from a mirror device.
163

    
164
    This is only valid for mirror devices, of course. It's the callers
165
    duty to send a correct disk, otherwise we raise an error.
166

    
167
    """
168
    bdev_s, ndev_s = params
169
    bdev = objects.Disk.FromDict(bdev_s)
170
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171
    if bdev is None or ndevs.count(None) > 0:
172
      raise ValueError("can't unserialize data!")
173
    return backend.MirrorRemoveChildren(bdev, ndevs)
174

    
175
  @staticmethod
176
  def perspective_blockdev_getmirrorstatus(params):
177
    """Return the mirror status for a list of disks.
178

    
179
    """
180
    disks = [objects.Disk.FromDict(dsk_s)
181
            for dsk_s in params]
182
    return backend.GetMirrorStatus(disks)
183

    
184
  @staticmethod
185
  def perspective_blockdev_find(params):
186
    """Expose the FindBlockDevice functionality for a disk.
187

    
188
    This will try to find but not activate a disk.
189

    
190
    """
191
    disk = objects.Disk.FromDict(params[0])
192
    return backend.FindBlockDevice(disk)
193

    
194
  @staticmethod
195
  def perspective_blockdev_snapshot(params):
196
    """Create a snapshot device.
197

    
198
    Note that this is only valid for LVM disks, if we get passed
199
    something else we raise an exception. The snapshot device can be
200
    remove by calling the generic block device remove call.
201

    
202
    """
203
    cfbd = objects.Disk.FromDict(params[0])
204
    return backend.SnapshotBlockDevice(cfbd)
205

    
206
  # export/import  --------------------------
207

    
208
  @staticmethod
209
  def perspective_snapshot_export(params):
210
    """Export a given snapshot.
211

    
212
    """
213
    disk = objects.Disk.FromDict(params[0])
214
    dest_node = params[1]
215
    instance = objects.Instance.FromDict(params[2])
216
    return backend.ExportSnapshot(disk, dest_node, instance)
217

    
218
  @staticmethod
219
  def perspective_finalize_export(params):
220
    """Expose the finalize export functionality.
221

    
222
    """
223
    instance = objects.Instance.FromDict(params[0])
224
    snap_disks = [objects.Disk.FromDict(str_data)
225
                  for str_data in params[1]]
226
    return backend.FinalizeExport(instance, snap_disks)
227

    
228
  @staticmethod
229
  def perspective_export_info(params):
230
    """Query information about an existing export on this node.
231

    
232
    The given path may not contain an export, in which case we return
233
    None.
234

    
235
    """
236
    path = params[0]
237
    einfo = backend.ExportInfo(path)
238
    if einfo is None:
239
      return einfo
240
    return einfo.Dumps()
241

    
242
  @staticmethod
243
  def perspective_export_list(params):
244
    """List the available exports on this node.
245

    
246
    Note that as opposed to export_info, which may query data about an
247
    export in any path, this only queries the standard Ganeti path
248
    (constants.EXPORT_DIR).
249

    
250
    """
251
    return backend.ListExports()
252

    
253
  @staticmethod
254
  def perspective_export_remove(params):
255
    """Remove an export.
256

    
257
    """
258
    export = params[0]
259
    return backend.RemoveExport(export)
260

    
261
  # volume  --------------------------
262

    
263
  @staticmethod
264
  def perspective_volume_list(params):
265
    """Query the list of logical volumes in a given volume group.
266

    
267
    """
268
    vgname = params[0]
269
    return backend.GetVolumeList(vgname)
270

    
271
  @staticmethod
272
  def perspective_vg_list(params):
273
    """Query the list of volume groups.
274

    
275
    """
276
    return backend.ListVolumeGroups()
277

    
278
  # bridge  --------------------------
279

    
280
  @staticmethod
281
  def perspective_bridges_exist(params):
282
    """Check if all bridges given exist on this node.
283

    
284
    """
285
    bridges_list = params[0]
286
    return backend.BridgesExist(bridges_list)
287

    
288
  # instance  --------------------------
289

    
290
  @staticmethod
291
  def perspective_instance_os_add(params):
292
    """Install an OS on a given instance.
293

    
294
    """
295
    inst_s, os_disk, swap_disk = params
296
    inst = objects.Instance.FromDict(inst_s)
297
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
298

    
299
  @staticmethod
300
  def perspective_instance_run_rename(params):
301
    """Runs the OS rename script for an instance.
302

    
303
    """
304
    inst_s, old_name, os_disk, swap_disk = params
305
    inst = objects.Instance.FromDict(inst_s)
306
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307

    
308
  @staticmethod
309
  def perspective_instance_os_import(params):
310
    """Run the import function of an OS onto a given instance.
311

    
312
    """
313
    inst_s, os_disk, swap_disk, src_node, src_image = params
314
    inst = objects.Instance.FromDict(inst_s)
315
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
316
                                        src_node, src_image)
317

    
318
  @staticmethod
319
  def perspective_instance_shutdown(params):
320
    """Shutdown an instance.
321

    
322
    """
323
    instance = objects.Instance.FromDict(params[0])
324
    return backend.ShutdownInstance(instance)
325

    
326
  @staticmethod
327
  def perspective_instance_start(params):
328
    """Start an instance.
329

    
330
    """
331
    instance = objects.Instance.FromDict(params[0])
332
    extra_args = params[1]
333
    return backend.StartInstance(instance, extra_args)
334

    
335
  @staticmethod
336
  def perspective_instance_reboot(params):
337
    """Reboot an instance.
338

    
339
    """
340
    instance = objects.Instance.FromDict(params[0])
341
    reboot_type = params[1]
342
    extra_args = params[2]
343
    return backend.RebootInstance(instance, reboot_type, extra_args)
344

    
345
  @staticmethod
346
  def perspective_instance_info(params):
347
    """Query instance information.
348

    
349
    """
350
    return backend.GetInstanceInfo(params[0])
351

    
352
  @staticmethod
353
  def perspective_all_instances_info(params):
354
    """Query information about all instances.
355

    
356
    """
357
    return backend.GetAllInstancesInfo()
358

    
359
  @staticmethod
360
  def perspective_instance_list(params):
361
    """Query the list of running instances.
362

    
363
    """
364
    return backend.GetInstanceList()
365

    
366
  # node --------------------------
367

    
368
  @staticmethod
369
  def perspective_node_tcp_ping(params):
370
    """Do a TcpPing on the remote node.
371

    
372
    """
373
    return utils.TcpPing(params[0], params[1], params[2],
374
                         timeout=params[3], live_port_needed=params[4])
375

    
376
  @staticmethod
377
  def perspective_node_info(params):
378
    """Query node information.
379

    
380
    """
381
    vgname = params[0]
382
    return backend.GetNodeInfo(vgname)
383

    
384
  @staticmethod
385
  def perspective_node_add(params):
386
    """Complete the registration of this node in the cluster.
387

    
388
    """
389
    return backend.AddNode(params[0], params[1], params[2],
390
                           params[3], params[4], params[5])
391

    
392
  @staticmethod
393
  def perspective_node_verify(params):
394
    """Run a verify sequence on this node.
395

    
396
    """
397
    return backend.VerifyNode(params[0])
398

    
399
  @staticmethod
400
  def perspective_node_start_master(params):
401
    """Promote this node to master status.
402

    
403
    """
404
    return backend.StartMaster()
405

    
406
  @staticmethod
407
  def perspective_node_stop_master(params):
408
    """Demote this node from master status.
409

    
410
    """
411
    return backend.StopMaster()
412

    
413
  @staticmethod
414
  def perspective_node_leave_cluster(params):
415
    """Cleanup after leaving a cluster.
416

    
417
    """
418
    return backend.LeaveCluster()
419

    
420
  @staticmethod
421
  def perspective_node_volumes(params):
422
    """Query the list of all logical volume groups.
423

    
424
    """
425
    return backend.NodeVolumes()
426

    
427
  # cluster --------------------------
428

    
429
  @staticmethod
430
  def perspective_version(params):
431
    """Query version information.
432

    
433
    """
434
    return constants.PROTOCOL_VERSION
435

    
436
  @staticmethod
437
  def perspective_upload_file(params):
438
    """Upload a file.
439

    
440
    Note that the backend implementation imposes strict rules on which
441
    files are accepted.
442

    
443
    """
444
    return backend.UploadFile(*params)
445

    
446

    
447
  # os -----------------------
448

    
449
  @staticmethod
450
  def perspective_os_diagnose(params):
451
    """Query detailed information about existing OSes.
452

    
453
    """
454
    os_list = backend.DiagnoseOS()
455
    if not os_list:
456
      # this catches also return values of 'False',
457
      # for which we can't iterate over
458
      return os_list
459
    result = []
460
    for data in os_list:
461
      if isinstance(data, objects.OS):
462
        result.append(data.ToDict())
463
      elif isinstance(data, errors.InvalidOS):
464
        result.append(data.args)
465
      else:
466
        raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
467
                                     " (class %s, %s)" %
468
                                     (str(data.__class__), data))
469

    
470
    return result
471

    
472
  @staticmethod
473
  def perspective_os_get(params):
474
    """Query information about a given OS.
475

    
476
    """
477
    name = params[0]
478
    try:
479
      os_obj = backend.OSFromDisk(name).ToDict()
480
    except errors.InvalidOS, err:
481
      os_obj = err.args
482
    return os_obj
483

    
484
  # hooks -----------------------
485

    
486
  @staticmethod
487
  def perspective_hooks_runner(params):
488
    """Run hook scripts.
489

    
490
    """
491
    hpath, phase, env = params
492
    hr = backend.HooksRunner()
493
    return hr.RunHooks(hpath, phase, env)
494

    
495

    
496
class MyRealm:
497
  """Simple realm that forwards all requests to a ServerObject.
498

    
499
  """
500
  __implements__ = portal.IRealm
501

    
502
  def requestAvatar(self, avatarId, mind, *interfaces):
503
    """Return an avatar based on our ServerObject class.
504

    
505
    """
506
    if pb.IPerspective not in interfaces:
507
      raise NotImplementedError
508
    return pb.IPerspective, ServerObject(avatarId), lambda:None
509

    
510

    
511
def ParseOptions():
512
  """Parse the command line options.
513

    
514
  Returns:
515
    (options, args) as from OptionParser.parse_args()
516

    
517
  """
518
  parser = OptionParser(description="Ganeti node daemon",
519
                        usage="%prog [-f] [-d]",
520
                        version="%%prog (ganeti) %s" %
521
                        constants.RELEASE_VERSION)
522

    
523
  parser.add_option("-f", "--foreground", dest="fork",
524
                    help="Don't detach from the current terminal",
525
                    default=True, action="store_false")
526
  parser.add_option("-d", "--debug", dest="debug",
527
                    help="Enable some debug messages",
528
                    default=False, action="store_true")
529
  options, args = parser.parse_args()
530
  return options, args
531

    
532

    
533
def main():
534
  """Main function for the node daemon.
535

    
536
  """
537
  options, args = ParseOptions()
538
  for fname in (constants.SSL_CERT_FILE,):
539
    if not os.path.isfile(fname):
540
      print "config %s not there, will not run." % fname
541
      sys.exit(5)
542

    
543
  try:
544
    ss = ssconf.SimpleStore()
545
    port = ss.GetNodeDaemonPort()
546
    pwdata = ss.GetNodeDaemonPassword()
547
  except errors.ConfigurationError, err:
548
    print "Cluster configuration incomplete: '%s'" % str(err)
549
    sys.exit(5)
550

    
551
  # become a daemon
552
  if options.fork:
553
    createDaemon()
554

    
555
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
556
                      program="ganeti-noded")
557

    
558
  p = portal.Portal(MyRealm())
559
  p.registerChecker(
560
    checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
561
  reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
562
  reactor.run()
563

    
564

    
565
def createDaemon():
566
  """Detach a process from the controlling terminal and run it in the
567
  background as a daemon.
568

    
569
  """
570
  UMASK = 077
571
  WORKDIR = "/"
572
  # Default maximum for the number of available file descriptors.
573
  if 'SC_OPEN_MAX' in os.sysconf_names:
574
    try:
575
      MAXFD = os.sysconf('SC_OPEN_MAX')
576
      if MAXFD < 0:
577
        MAXFD = 1024
578
    except OSError:
579
      MAXFD = 1024
580
  else:
581
    MAXFD = 1024
582
  # The standard I/O file descriptors are redirected to /dev/null by default.
583
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
584
  REDIRECT_TO = constants.LOG_NODESERVER
585
  try:
586
    pid = os.fork()
587
  except OSError, e:
588
    raise Exception("%s [%d]" % (e.strerror, e.errno))
589
  if (pid == 0):  # The first child.
590
    os.setsid()
591
    try:
592
      pid = os.fork() # Fork a second child.
593
    except OSError, e:
594
      raise Exception("%s [%d]" % (e.strerror, e.errno))
595
    if (pid == 0):  # The second child.
596
      os.chdir(WORKDIR)
597
      os.umask(UMASK)
598
    else:
599
      # exit() or _exit()?  See below.
600
      os._exit(0) # Exit parent (the first child) of the second child.
601
  else:
602
    os._exit(0) # Exit parent of the first child.
603
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
604
  if (maxfd == resource.RLIM_INFINITY):
605
    maxfd = MAXFD
606

    
607
  # Iterate through and close all file descriptors.
608
  for fd in range(0, maxfd):
609
    try:
610
      os.close(fd)
611
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
612
      pass
613
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
614
  # Duplicate standard input to standard output and standard error.
615
  os.dup2(0, 1)     # standard output (1)
616
  os.dup2(0, 2)     # standard error (2)
617
  return(0)
618

    
619

    
620
if __name__ == '__main__':
621
  main()