Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ aa9075c5

History | View | Annotate | Download (16.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import ssconf
43
from ganeti import http
44
from ganeti import utils
45

    
46

    
47
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
48
  """The server implementation.
49

    
50
  This class holds all methods exposed over the RPC interface.
51

    
52
  """
53
  def HandleRequest(self):
54
    """Handle a request.
55

    
56
    """
57
    if self.command.upper() != "PUT":
58
      raise http.HTTPBadRequest()
59

    
60
    path = self.path
61
    if path.startswith("/"):
62
      path = path[1:]
63

    
64
    method = getattr(self, "perspective_%s" % path, None)
65
    if method is None:
66
      raise httperror.HTTPNotFound()
67

    
68
    try:
69
      try:
70
        return method(self.post_data)
71
      except:
72
        logging.exception("Error in RPC call")
73
        raise
74
    except errors.QuitGanetiException, err:
75
      # Tell parent to quit
76
      os.kill(self.server.noded_pid, signal.SIGTERM)
77

    
78
  # the new block devices  --------------------------
79

    
80
  @staticmethod
81
  def perspective_blockdev_create(params):
82
    """Create a block device.
83

    
84
    """
85
    bdev_s, size, owner, on_primary, info = params
86
    bdev = objects.Disk.FromDict(bdev_s)
87
    if bdev is None:
88
      raise ValueError("can't unserialize data!")
89
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
90

    
91
  @staticmethod
92
  def perspective_blockdev_remove(params):
93
    """Remove a block device.
94

    
95
    """
96
    bdev_s = params[0]
97
    bdev = objects.Disk.FromDict(bdev_s)
98
    return backend.RemoveBlockDevice(bdev)
99

    
100
  @staticmethod
101
  def perspective_blockdev_rename(params):
102
    """Remove a block device.
103

    
104
    """
105
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
106
    return backend.RenameBlockDevices(devlist)
107

    
108
  @staticmethod
109
  def perspective_blockdev_assemble(params):
110
    """Assemble a block device.
111

    
112
    """
113
    bdev_s, owner, on_primary = params
114
    bdev = objects.Disk.FromDict(bdev_s)
115
    if bdev is None:
116
      raise ValueError("can't unserialize data!")
117
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
118

    
119
  @staticmethod
120
  def perspective_blockdev_shutdown(params):
121
    """Shutdown a block device.
122

    
123
    """
124
    bdev_s = params[0]
125
    bdev = objects.Disk.FromDict(bdev_s)
126
    if bdev is None:
127
      raise ValueError("can't unserialize data!")
128
    return backend.ShutdownBlockDevice(bdev)
129

    
130
  @staticmethod
131
  def perspective_blockdev_addchildren(params):
132
    """Add a child to a mirror device.
133

    
134
    Note: this is only valid for mirror devices. It's the caller's duty
135
    to send a correct disk, otherwise we raise an error.
136

    
137
    """
138
    bdev_s, ndev_s = params
139
    bdev = objects.Disk.FromDict(bdev_s)
140
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
141
    if bdev is None or ndevs.count(None) > 0:
142
      raise ValueError("can't unserialize data!")
143
    return backend.MirrorAddChildren(bdev, ndevs)
144

    
145
  @staticmethod
146
  def perspective_blockdev_removechildren(params):
147
    """Remove a child from a mirror device.
148

    
149
    This is only valid for mirror devices, of course. It's the callers
150
    duty to send a correct disk, otherwise we raise an error.
151

    
152
    """
153
    bdev_s, ndev_s = params
154
    bdev = objects.Disk.FromDict(bdev_s)
155
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156
    if bdev is None or ndevs.count(None) > 0:
157
      raise ValueError("can't unserialize data!")
158
    return backend.MirrorRemoveChildren(bdev, ndevs)
159

    
160
  @staticmethod
161
  def perspective_blockdev_getmirrorstatus(params):
162
    """Return the mirror status for a list of disks.
163

    
164
    """
165
    disks = [objects.Disk.FromDict(dsk_s)
166
            for dsk_s in params]
167
    return backend.GetMirrorStatus(disks)
168

    
169
  @staticmethod
170
  def perspective_blockdev_find(params):
171
    """Expose the FindBlockDevice functionality for a disk.
172

    
173
    This will try to find but not activate a disk.
174

    
175
    """
176
    disk = objects.Disk.FromDict(params[0])
177
    return backend.FindBlockDevice(disk)
178

    
179
  @staticmethod
180
  def perspective_blockdev_snapshot(params):
181
    """Create a snapshot device.
182

    
183
    Note that this is only valid for LVM disks, if we get passed
184
    something else we raise an exception. The snapshot device can be
185
    remove by calling the generic block device remove call.
186

    
187
    """
188
    cfbd = objects.Disk.FromDict(params[0])
189
    return backend.SnapshotBlockDevice(cfbd)
190

    
191
  @staticmethod
192
  def perspective_blockdev_grow(params):
193
    """Grow a stack of devices.
194

    
195
    """
196
    cfbd = objects.Disk.FromDict(params[0])
197
    amount = params[1]
198
    return backend.GrowBlockDevice(cfbd, amount)
199

    
200
  @staticmethod
201
  def perspective_blockdev_close(params):
202
    """Closes the given block devices.
203

    
204
    """
205
    disks = [objects.Disk.FromDict(cf) for cf in params]
206
    return backend.CloseBlockDevices(disks)
207

    
208
  # export/import  --------------------------
209

    
210
  @staticmethod
211
  def perspective_snapshot_export(params):
212
    """Export a given snapshot.
213

    
214
    """
215
    disk = objects.Disk.FromDict(params[0])
216
    dest_node = params[1]
217
    instance = objects.Instance.FromDict(params[2])
218
    return backend.ExportSnapshot(disk, dest_node, instance)
219

    
220
  @staticmethod
221
  def perspective_finalize_export(params):
222
    """Expose the finalize export functionality.
223

    
224
    """
225
    instance = objects.Instance.FromDict(params[0])
226
    snap_disks = [objects.Disk.FromDict(str_data)
227
                  for str_data in params[1]]
228
    return backend.FinalizeExport(instance, snap_disks)
229

    
230
  @staticmethod
231
  def perspective_export_info(params):
232
    """Query information about an existing export on this node.
233

    
234
    The given path may not contain an export, in which case we return
235
    None.
236

    
237
    """
238
    path = params[0]
239
    einfo = backend.ExportInfo(path)
240
    if einfo is None:
241
      return einfo
242
    return einfo.Dumps()
243

    
244
  @staticmethod
245
  def perspective_export_list(params):
246
    """List the available exports on this node.
247

    
248
    Note that as opposed to export_info, which may query data about an
249
    export in any path, this only queries the standard Ganeti path
250
    (constants.EXPORT_DIR).
251

    
252
    """
253
    return backend.ListExports()
254

    
255
  @staticmethod
256
  def perspective_export_remove(params):
257
    """Remove an export.
258

    
259
    """
260
    export = params[0]
261
    return backend.RemoveExport(export)
262

    
263
  # volume  --------------------------
264

    
265
  @staticmethod
266
  def perspective_volume_list(params):
267
    """Query the list of logical volumes in a given volume group.
268

    
269
    """
270
    vgname = params[0]
271
    return backend.GetVolumeList(vgname)
272

    
273
  @staticmethod
274
  def perspective_vg_list(params):
275
    """Query the list of volume groups.
276

    
277
    """
278
    return backend.ListVolumeGroups()
279

    
280
  # bridge  --------------------------
281

    
282
  @staticmethod
283
  def perspective_bridges_exist(params):
284
    """Check if all bridges given exist on this node.
285

    
286
    """
287
    bridges_list = params[0]
288
    return backend.BridgesExist(bridges_list)
289

    
290
  # instance  --------------------------
291

    
292
  @staticmethod
293
  def perspective_instance_os_add(params):
294
    """Install an OS on a given instance.
295

    
296
    """
297
    inst_s, os_disk, swap_disk = params
298
    inst = objects.Instance.FromDict(inst_s)
299
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
300

    
301
  @staticmethod
302
  def perspective_instance_run_rename(params):
303
    """Runs the OS rename script for an instance.
304

    
305
    """
306
    inst_s, old_name, os_disk, swap_disk = params
307
    inst = objects.Instance.FromDict(inst_s)
308
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
309

    
310
  @staticmethod
311
  def perspective_instance_os_import(params):
312
    """Run the import function of an OS onto a given instance.
313

    
314
    """
315
    inst_s, os_disk, swap_disk, src_node, src_image = params
316
    inst = objects.Instance.FromDict(inst_s)
317
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
318
                                        src_node, src_image)
319

    
320
  @staticmethod
321
  def perspective_instance_shutdown(params):
322
    """Shutdown an instance.
323

    
324
    """
325
    instance = objects.Instance.FromDict(params[0])
326
    return backend.ShutdownInstance(instance)
327

    
328
  @staticmethod
329
  def perspective_instance_start(params):
330
    """Start an instance.
331

    
332
    """
333
    instance = objects.Instance.FromDict(params[0])
334
    extra_args = params[1]
335
    return backend.StartInstance(instance, extra_args)
336

    
337
  @staticmethod
338
  def perspective_instance_migrate(params):
339
    """Migrates an instance.
340

    
341
    """
342
    instance, target, live = params
343
    return backend.MigrateInstance(instance, target, live)
344

    
345
  @staticmethod
346
  def perspective_instance_reboot(params):
347
    """Reboot an instance.
348

    
349
    """
350
    instance = objects.Instance.FromDict(params[0])
351
    reboot_type = params[1]
352
    extra_args = params[2]
353
    return backend.RebootInstance(instance, reboot_type, extra_args)
354

    
355
  @staticmethod
356
  def perspective_instance_info(params):
357
    """Query instance information.
358

    
359
    """
360
    return backend.GetInstanceInfo(params[0])
361

    
362
  @staticmethod
363
  def perspective_all_instances_info(params):
364
    """Query information about all instances.
365

    
366
    """
367
    return backend.GetAllInstancesInfo()
368

    
369
  @staticmethod
370
  def perspective_instance_list(params):
371
    """Query the list of running instances.
372

    
373
    """
374
    return backend.GetInstanceList()
375

    
376
  # node --------------------------
377

    
378
  @staticmethod
379
  def perspective_node_tcp_ping(params):
380
    """Do a TcpPing on the remote node.
381

    
382
    """
383
    return utils.TcpPing(params[1], params[2], timeout=params[3],
384
                         live_port_needed=params[4], source=params[0])
385

    
386
  @staticmethod
387
  def perspective_node_info(params):
388
    """Query node information.
389

    
390
    """
391
    vgname = params[0]
392
    return backend.GetNodeInfo(vgname)
393

    
394
  @staticmethod
395
  def perspective_node_add(params):
396
    """Complete the registration of this node in the cluster.
397

    
398
    """
399
    return backend.AddNode(params[0], params[1], params[2],
400
                           params[3], params[4], params[5])
401

    
402
  @staticmethod
403
  def perspective_node_verify(params):
404
    """Run a verify sequence on this node.
405

    
406
    """
407
    return backend.VerifyNode(params[0])
408

    
409
  @staticmethod
410
  def perspective_node_start_master(params):
411
    """Promote this node to master status.
412

    
413
    """
414
    return backend.StartMaster(params[0])
415

    
416
  @staticmethod
417
  def perspective_node_stop_master(params):
418
    """Demote this node from master status.
419

    
420
    """
421
    return backend.StopMaster(params[0])
422

    
423
  @staticmethod
424
  def perspective_node_leave_cluster(params):
425
    """Cleanup after leaving a cluster.
426

    
427
    """
428
    return backend.LeaveCluster()
429

    
430
  @staticmethod
431
  def perspective_node_volumes(params):
432
    """Query the list of all logical volume groups.
433

    
434
    """
435
    return backend.NodeVolumes()
436

    
437
  # cluster --------------------------
438

    
439
  @staticmethod
440
  def perspective_version(params):
441
    """Query version information.
442

    
443
    """
444
    return constants.PROTOCOL_VERSION
445

    
446
  @staticmethod
447
  def perspective_upload_file(params):
448
    """Upload a file.
449

    
450
    Note that the backend implementation imposes strict rules on which
451
    files are accepted.
452

    
453
    """
454
    return backend.UploadFile(*params)
455

    
456

    
457
  # os -----------------------
458

    
459
  @staticmethod
460
  def perspective_os_diagnose(params):
461
    """Query detailed information about existing OSes.
462

    
463
    """
464
    return [os.ToDict() for os in backend.DiagnoseOS()]
465

    
466
  @staticmethod
467
  def perspective_os_get(params):
468
    """Query information about a given OS.
469

    
470
    """
471
    name = params[0]
472
    try:
473
      os_obj = backend.OSFromDisk(name)
474
    except errors.InvalidOS, err:
475
      os_obj = objects.OS.FromInvalidOS(err)
476
    return os_obj.ToDict()
477

    
478
  # hooks -----------------------
479

    
480
  @staticmethod
481
  def perspective_hooks_runner(params):
482
    """Run hook scripts.
483

    
484
    """
485
    hpath, phase, env = params
486
    hr = backend.HooksRunner()
487
    return hr.RunHooks(hpath, phase, env)
488

    
489
  # iallocator -----------------
490

    
491
  @staticmethod
492
  def perspective_iallocator_runner(params):
493
    """Run an iallocator script.
494

    
495
    """
496
    name, idata = params
497
    iar = backend.IAllocatorRunner()
498
    return iar.Run(name, idata)
499

    
500
  # test -----------------------
501

    
502
  @staticmethod
503
  def perspective_test_delay(params):
504
    """Run test delay.
505

    
506
    """
507
    duration = params[0]
508
    return utils.TestDelay(duration)
509

    
510
  @staticmethod
511
  def perspective_file_storage_dir_create(params):
512
    """Create the file storage directory.
513

    
514
    """
515
    file_storage_dir = params[0]
516
    return backend.CreateFileStorageDir(file_storage_dir)
517

    
518
  @staticmethod
519
  def perspective_file_storage_dir_remove(params):
520
    """Remove the file storage directory.
521

    
522
    """
523
    file_storage_dir = params[0]
524
    return backend.RemoveFileStorageDir(file_storage_dir)
525

    
526
  @staticmethod
527
  def perspective_file_storage_dir_rename(params):
528
    """Rename the file storage directory.
529

    
530
    """
531
    old_file_storage_dir = params[0]
532
    new_file_storage_dir = params[1]
533
    return backend.RenameFileStorageDir(old_file_storage_dir,
534
                                        new_file_storage_dir)
535

    
536
  @staticmethod
537
  def perspective_jobqueue_update(params):
538
    """Update job queue.
539

    
540
    """
541
    (file_name, content) = params
542
    return backend.JobQueueUpdate(file_name, content)
543

    
544
  @staticmethod
545
  def perspective_jobqueue_purge(params):
546
    """Purge job queue.
547

    
548
    """
549
    return backend.JobQueuePurge()
550

    
551

    
552
class NodeDaemonHttpServer(http.HTTPServer):
553
  def __init__(self, server_address):
554
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
555
    self.noded_pid = os.getpid()
556

    
557
  def serve_forever(self):
558
    """Handle requests until told to quit."""
559
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
560
    try:
561
      while not sighandler.called:
562
        self.handle_request()
563
      # TODO: There could be children running at this point
564
    finally:
565
      sighandler.Reset()
566

    
567

    
568
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
569
  """Forking HTTP Server.
570

    
571
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
572
  request we handle. This allows more requests to be handled concurrently.
573

    
574
  """
575

    
576

    
577
def ParseOptions():
578
  """Parse the command line options.
579

    
580
  Returns:
581
    (options, args) as from OptionParser.parse_args()
582

    
583
  """
584
  parser = OptionParser(description="Ganeti node daemon",
585
                        usage="%prog [-f] [-d]",
586
                        version="%%prog (ganeti) %s" %
587
                        constants.RELEASE_VERSION)
588

    
589
  parser.add_option("-f", "--foreground", dest="fork",
590
                    help="Don't detach from the current terminal",
591
                    default=True, action="store_false")
592
  parser.add_option("-d", "--debug", dest="debug",
593
                    help="Enable some debug messages",
594
                    default=False, action="store_true")
595
  options, args = parser.parse_args()
596
  return options, args
597

    
598

    
599
def main():
600
  """Main function for the node daemon.
601

    
602
  """
603
  options, args = ParseOptions()
604
  utils.debug = options.debug
605
  for fname in (constants.SSL_CERT_FILE,):
606
    if not os.path.isfile(fname):
607
      print "config %s not there, will not run." % fname
608
      sys.exit(5)
609

    
610
  try:
611
    ss = ssconf.SimpleStore()
612
    port = ss.GetNodeDaemonPort()
613
    pwdata = ss.GetNodeDaemonPassword()
614
  except errors.ConfigurationError, err:
615
    print "Cluster configuration incomplete: '%s'" % str(err)
616
    sys.exit(5)
617

    
618
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
619
  # situation where RUN_DIR is tmpfs
620
  for dir_name in constants.SUB_RUN_DIRS:
621
    if not os.path.exists(dir_name):
622
      try:
623
        os.mkdir(dir_name, 0755)
624
      except EnvironmentError, err:
625
        if err.errno != errno.EEXIST:
626
          print ("Node setup wrong, cannot create directory %s: %s" %
627
                 (dir_name, err))
628
          sys.exit(5)
629
    if not os.path.isdir(dir_name):
630
      print ("Node setup wrong, %s is not a directory" % dir_name)
631
      sys.exit(5)
632

    
633
  # become a daemon
634
  if options.fork:
635
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
636

    
637
  utils.WritePidFile(constants.NODED_PID)
638

    
639
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
640
                      stderr_logging=not options.fork)
641
  logging.info("ganeti node daemon startup")
642

    
643
  if options.fork:
644
    server = ForkingHTTPServer(('', port))
645
  else:
646
    server = NodeDaemonHttpServer(('', port))
647

    
648
  try:
649
    server.serve_forever()
650
  finally:
651
    utils.RemovePidFile(constants.NODED_PID)
652

    
653

    
654
if __name__ == '__main__':
655
  main()