Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 7d88772a

History | View | Annotate | Download (19.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46
import ganeti.http.server
47

    
48

    
49
queue_lock = None
50

    
51

    
52
def _RequireJobQueueLock(fn):
53
  """Decorator for job queue manipulating functions.
54

    
55
  """
56
  QUEUE_LOCK_TIMEOUT = 10
57

    
58
  def wrapper(*args, **kwargs):
59
    # Locking in exclusive, blocking mode because there could be several
60
    # children running at the same time. Waiting up to 10 seconds.
61
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62
    try:
63
      return fn(*args, **kwargs)
64
    finally:
65
      queue_lock.Unlock()
66

    
67
  return wrapper
68

    
69

    
70
class NodeHttpServer(http.server.HttpServer):
71
  """The server implementation.
72

    
73
  This class holds all methods exposed over the RPC interface.
74

    
75
  """
76
  def __init__(self, *args, **kwargs):
77
    http.server.HttpServer.__init__(self, *args, **kwargs)
78
    self.noded_pid = os.getpid()
79

    
80
  def HandleRequest(self, req):
81
    """Handle a request.
82

    
83
    """
84
    if req.request_method.upper() != http.HTTP_PUT:
85
      raise http.HttpBadRequest()
86

    
87
    path = req.request_path
88
    if path.startswith("/"):
89
      path = path[1:]
90

    
91
    method = getattr(self, "perspective_%s" % path, None)
92
    if method is None:
93
      raise http.HttpNotFound()
94

    
95
    try:
96
      try:
97
        return method(req.request_body)
98
      except:
99
        logging.exception("Error in RPC call")
100
        raise
101
    except errors.QuitGanetiException, err:
102
      # Tell parent to quit
103
      os.kill(self.noded_pid, signal.SIGTERM)
104

    
105
  # the new block devices  --------------------------
106

    
107
  @staticmethod
108
  def perspective_blockdev_create(params):
109
    """Create a block device.
110

    
111
    """
112
    bdev_s, size, owner, on_primary, info = params
113
    bdev = objects.Disk.FromDict(bdev_s)
114
    if bdev is None:
115
      raise ValueError("can't unserialize data!")
116
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
117

    
118
  @staticmethod
119
  def perspective_blockdev_remove(params):
120
    """Remove a block device.
121

    
122
    """
123
    bdev_s = params[0]
124
    bdev = objects.Disk.FromDict(bdev_s)
125
    return backend.RemoveBlockDevice(bdev)
126

    
127
  @staticmethod
128
  def perspective_blockdev_rename(params):
129
    """Remove a block device.
130

    
131
    """
132
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133
    return backend.RenameBlockDevices(devlist)
134

    
135
  @staticmethod
136
  def perspective_blockdev_assemble(params):
137
    """Assemble a block device.
138

    
139
    """
140
    bdev_s, owner, on_primary = params
141
    bdev = objects.Disk.FromDict(bdev_s)
142
    if bdev is None:
143
      raise ValueError("can't unserialize data!")
144
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
145

    
146
  @staticmethod
147
  def perspective_blockdev_shutdown(params):
148
    """Shutdown a block device.
149

    
150
    """
151
    bdev_s = params[0]
152
    bdev = objects.Disk.FromDict(bdev_s)
153
    if bdev is None:
154
      raise ValueError("can't unserialize data!")
155
    return backend.ShutdownBlockDevice(bdev)
156

    
157
  @staticmethod
158
  def perspective_blockdev_addchildren(params):
159
    """Add a child to a mirror device.
160

    
161
    Note: this is only valid for mirror devices. It's the caller's duty
162
    to send a correct disk, otherwise we raise an error.
163

    
164
    """
165
    bdev_s, ndev_s = params
166
    bdev = objects.Disk.FromDict(bdev_s)
167
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168
    if bdev is None or ndevs.count(None) > 0:
169
      raise ValueError("can't unserialize data!")
170
    return backend.MirrorAddChildren(bdev, ndevs)
171

    
172
  @staticmethod
173
  def perspective_blockdev_removechildren(params):
174
    """Remove a child from a mirror device.
175

    
176
    This is only valid for mirror devices, of course. It's the callers
177
    duty to send a correct disk, otherwise we raise an error.
178

    
179
    """
180
    bdev_s, ndev_s = params
181
    bdev = objects.Disk.FromDict(bdev_s)
182
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183
    if bdev is None or ndevs.count(None) > 0:
184
      raise ValueError("can't unserialize data!")
185
    return backend.MirrorRemoveChildren(bdev, ndevs)
186

    
187
  @staticmethod
188
  def perspective_blockdev_getmirrorstatus(params):
189
    """Return the mirror status for a list of disks.
190

    
191
    """
192
    disks = [objects.Disk.FromDict(dsk_s)
193
            for dsk_s in params]
194
    return backend.GetMirrorStatus(disks)
195

    
196
  @staticmethod
197
  def perspective_blockdev_find(params):
198
    """Expose the FindBlockDevice functionality for a disk.
199

    
200
    This will try to find but not activate a disk.
201

    
202
    """
203
    disk = objects.Disk.FromDict(params[0])
204
    return backend.FindBlockDevice(disk)
205

    
206
  @staticmethod
207
  def perspective_blockdev_snapshot(params):
208
    """Create a snapshot device.
209

    
210
    Note that this is only valid for LVM disks, if we get passed
211
    something else we raise an exception. The snapshot device can be
212
    remove by calling the generic block device remove call.
213

    
214
    """
215
    cfbd = objects.Disk.FromDict(params[0])
216
    return backend.SnapshotBlockDevice(cfbd)
217

    
218
  @staticmethod
219
  def perspective_blockdev_grow(params):
220
    """Grow a stack of devices.
221

    
222
    """
223
    cfbd = objects.Disk.FromDict(params[0])
224
    amount = params[1]
225
    return backend.GrowBlockDevice(cfbd, amount)
226

    
227
  @staticmethod
228
  def perspective_blockdev_close(params):
229
    """Closes the given block devices.
230

    
231
    """
232
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233
    return backend.CloseBlockDevices(params[0], disks)
234

    
235
  # export/import  --------------------------
236

    
237
  @staticmethod
238
  def perspective_snapshot_export(params):
239
    """Export a given snapshot.
240

    
241
    """
242
    disk = objects.Disk.FromDict(params[0])
243
    dest_node = params[1]
244
    instance = objects.Instance.FromDict(params[2])
245
    cluster_name = params[3]
246
    dev_idx = params[4]
247
    return backend.ExportSnapshot(disk, dest_node, instance,
248
                                  cluster_name, dev_idx)
249

    
250
  @staticmethod
251
  def perspective_finalize_export(params):
252
    """Expose the finalize export functionality.
253

    
254
    """
255
    instance = objects.Instance.FromDict(params[0])
256
    snap_disks = [objects.Disk.FromDict(str_data)
257
                  for str_data in params[1]]
258
    return backend.FinalizeExport(instance, snap_disks)
259

    
260
  @staticmethod
261
  def perspective_export_info(params):
262
    """Query information about an existing export on this node.
263

    
264
    The given path may not contain an export, in which case we return
265
    None.
266

    
267
    """
268
    path = params[0]
269
    einfo = backend.ExportInfo(path)
270
    if einfo is None:
271
      return einfo
272
    return einfo.Dumps()
273

    
274
  @staticmethod
275
  def perspective_export_list(params):
276
    """List the available exports on this node.
277

    
278
    Note that as opposed to export_info, which may query data about an
279
    export in any path, this only queries the standard Ganeti path
280
    (constants.EXPORT_DIR).
281

    
282
    """
283
    return backend.ListExports()
284

    
285
  @staticmethod
286
  def perspective_export_remove(params):
287
    """Remove an export.
288

    
289
    """
290
    export = params[0]
291
    return backend.RemoveExport(export)
292

    
293
  # volume  --------------------------
294

    
295
  @staticmethod
296
  def perspective_volume_list(params):
297
    """Query the list of logical volumes in a given volume group.
298

    
299
    """
300
    vgname = params[0]
301
    return backend.GetVolumeList(vgname)
302

    
303
  @staticmethod
304
  def perspective_vg_list(params):
305
    """Query the list of volume groups.
306

    
307
    """
308
    return backend.ListVolumeGroups()
309

    
310
  # bridge  --------------------------
311

    
312
  @staticmethod
313
  def perspective_bridges_exist(params):
314
    """Check if all bridges given exist on this node.
315

    
316
    """
317
    bridges_list = params[0]
318
    return backend.BridgesExist(bridges_list)
319

    
320
  # instance  --------------------------
321

    
322
  @staticmethod
323
  def perspective_instance_os_add(params):
324
    """Install an OS on a given instance.
325

    
326
    """
327
    inst_s = params[0]
328
    inst = objects.Instance.FromDict(inst_s)
329
    return backend.AddOSToInstance(inst)
330

    
331
  @staticmethod
332
  def perspective_instance_run_rename(params):
333
    """Runs the OS rename script for an instance.
334

    
335
    """
336
    inst_s, old_name = params
337
    inst = objects.Instance.FromDict(inst_s)
338
    return backend.RunRenameInstance(inst, old_name)
339

    
340
  @staticmethod
341
  def perspective_instance_os_import(params):
342
    """Run the import function of an OS onto a given instance.
343

    
344
    """
345
    inst_s, src_node, src_images, cluster_name = params
346
    inst = objects.Instance.FromDict(inst_s)
347
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
348
                                        cluster_name)
349

    
350
  @staticmethod
351
  def perspective_instance_shutdown(params):
352
    """Shutdown an instance.
353

    
354
    """
355
    instance = objects.Instance.FromDict(params[0])
356
    return backend.ShutdownInstance(instance)
357

    
358
  @staticmethod
359
  def perspective_instance_start(params):
360
    """Start an instance.
361

    
362
    """
363
    instance = objects.Instance.FromDict(params[0])
364
    extra_args = params[1]
365
    return backend.StartInstance(instance, extra_args)
366

    
367
  @staticmethod
368
  def perspective_instance_migrate(params):
369
    """Migrates an instance.
370

    
371
    """
372
    instance, target, live = params
373
    instance = objects.Instance.FromDict(instance)
374
    return backend.MigrateInstance(instance, target, live)
375

    
376
  @staticmethod
377
  def perspective_instance_reboot(params):
378
    """Reboot an instance.
379

    
380
    """
381
    instance = objects.Instance.FromDict(params[0])
382
    reboot_type = params[1]
383
    extra_args = params[2]
384
    return backend.RebootInstance(instance, reboot_type, extra_args)
385

    
386
  @staticmethod
387
  def perspective_instance_info(params):
388
    """Query instance information.
389

    
390
    """
391
    return backend.GetInstanceInfo(params[0], params[1])
392

    
393
  @staticmethod
394
  def perspective_instance_migratable(params):
395
    """Query whether the specified instance can be migrated.
396

    
397
    """
398
    instance = objects.Instance.FromDict(params[0])
399
    return backend.GetInstanceMigratable(instance)
400

    
401
  @staticmethod
402
  def perspective_all_instances_info(params):
403
    """Query information about all instances.
404

    
405
    """
406
    return backend.GetAllInstancesInfo(params[0])
407

    
408
  @staticmethod
409
  def perspective_instance_list(params):
410
    """Query the list of running instances.
411

    
412
    """
413
    return backend.GetInstanceList(params[0])
414

    
415
  # node --------------------------
416

    
417
  @staticmethod
418
  def perspective_node_tcp_ping(params):
419
    """Do a TcpPing on the remote node.
420

    
421
    """
422
    return utils.TcpPing(params[1], params[2], timeout=params[3],
423
                         live_port_needed=params[4], source=params[0])
424

    
425
  @staticmethod
426
  def perspective_node_has_ip_address(params):
427
    """Checks if a node has the given ip address.
428

    
429
    """
430
    return utils.OwnIpAddress(params[0])
431

    
432
  @staticmethod
433
  def perspective_node_info(params):
434
    """Query node information.
435

    
436
    """
437
    vgname, hypervisor_type = params
438
    return backend.GetNodeInfo(vgname, hypervisor_type)
439

    
440
  @staticmethod
441
  def perspective_node_add(params):
442
    """Complete the registration of this node in the cluster.
443

    
444
    """
445
    return backend.AddNode(params[0], params[1], params[2],
446
                           params[3], params[4], params[5])
447

    
448
  @staticmethod
449
  def perspective_node_verify(params):
450
    """Run a verify sequence on this node.
451

    
452
    """
453
    return backend.VerifyNode(params[0], params[1])
454

    
455
  @staticmethod
456
  def perspective_node_start_master(params):
457
    """Promote this node to master status.
458

    
459
    """
460
    return backend.StartMaster(params[0])
461

    
462
  @staticmethod
463
  def perspective_node_stop_master(params):
464
    """Demote this node from master status.
465

    
466
    """
467
    return backend.StopMaster(params[0])
468

    
469
  @staticmethod
470
  def perspective_node_leave_cluster(params):
471
    """Cleanup after leaving a cluster.
472

    
473
    """
474
    return backend.LeaveCluster()
475

    
476
  @staticmethod
477
  def perspective_node_volumes(params):
478
    """Query the list of all logical volume groups.
479

    
480
    """
481
    return backend.NodeVolumes()
482

    
483
  @staticmethod
484
  def perspective_node_demote_from_mc(params):
485
    """Demote a node from the master candidate role.
486

    
487
    """
488
    return backend.DemoteFromMC()
489

    
490

    
491
  # cluster --------------------------
492

    
493
  @staticmethod
494
  def perspective_version(params):
495
    """Query version information.
496

    
497
    """
498
    return constants.PROTOCOL_VERSION
499

    
500
  @staticmethod
501
  def perspective_upload_file(params):
502
    """Upload a file.
503

    
504
    Note that the backend implementation imposes strict rules on which
505
    files are accepted.
506

    
507
    """
508
    return backend.UploadFile(*params)
509

    
510
  @staticmethod
511
  def perspective_master_info(params):
512
    """Query master information.
513

    
514
    """
515
    return backend.GetMasterInfo()
516

    
517
  @staticmethod
518
  def perspective_write_ssconf_files(params):
519
    """Write ssconf files.
520

    
521
    """
522
    (values,) = params
523
    return backend.WriteSsconfFiles(values)
524

    
525
  # os -----------------------
526

    
527
  @staticmethod
528
  def perspective_os_diagnose(params):
529
    """Query detailed information about existing OSes.
530

    
531
    """
532
    return [os.ToDict() for os in backend.DiagnoseOS()]
533

    
534
  @staticmethod
535
  def perspective_os_get(params):
536
    """Query information about a given OS.
537

    
538
    """
539
    name = params[0]
540
    try:
541
      os_obj = backend.OSFromDisk(name)
542
    except errors.InvalidOS, err:
543
      os_obj = objects.OS.FromInvalidOS(err)
544
    return os_obj.ToDict()
545

    
546
  # hooks -----------------------
547

    
548
  @staticmethod
549
  def perspective_hooks_runner(params):
550
    """Run hook scripts.
551

    
552
    """
553
    hpath, phase, env = params
554
    hr = backend.HooksRunner()
555
    return hr.RunHooks(hpath, phase, env)
556

    
557
  # iallocator -----------------
558

    
559
  @staticmethod
560
  def perspective_iallocator_runner(params):
561
    """Run an iallocator script.
562

    
563
    """
564
    name, idata = params
565
    iar = backend.IAllocatorRunner()
566
    return iar.Run(name, idata)
567

    
568
  # test -----------------------
569

    
570
  @staticmethod
571
  def perspective_test_delay(params):
572
    """Run test delay.
573

    
574
    """
575
    duration = params[0]
576
    return utils.TestDelay(duration)
577

    
578
  # file storage ---------------
579

    
580
  @staticmethod
581
  def perspective_file_storage_dir_create(params):
582
    """Create the file storage directory.
583

    
584
    """
585
    file_storage_dir = params[0]
586
    return backend.CreateFileStorageDir(file_storage_dir)
587

    
588
  @staticmethod
589
  def perspective_file_storage_dir_remove(params):
590
    """Remove the file storage directory.
591

    
592
    """
593
    file_storage_dir = params[0]
594
    return backend.RemoveFileStorageDir(file_storage_dir)
595

    
596
  @staticmethod
597
  def perspective_file_storage_dir_rename(params):
598
    """Rename the file storage directory.
599

    
600
    """
601
    old_file_storage_dir = params[0]
602
    new_file_storage_dir = params[1]
603
    return backend.RenameFileStorageDir(old_file_storage_dir,
604
                                        new_file_storage_dir)
605

    
606
  # jobs ------------------------
607

    
608
  @staticmethod
609
  @_RequireJobQueueLock
610
  def perspective_jobqueue_update(params):
611
    """Update job queue.
612

    
613
    """
614
    (file_name, content) = params
615
    return backend.JobQueueUpdate(file_name, content)
616

    
617
  @staticmethod
618
  @_RequireJobQueueLock
619
  def perspective_jobqueue_purge(params):
620
    """Purge job queue.
621

    
622
    """
623
    return backend.JobQueuePurge()
624

    
625
  @staticmethod
626
  @_RequireJobQueueLock
627
  def perspective_jobqueue_rename(params):
628
    """Rename a job queue file.
629

    
630
    """
631
    # TODO: What if a file fails to rename?
632
    return [backend.JobQueueRename(old, new) for old, new in params]
633

    
634
  @staticmethod
635
  def perspective_jobqueue_set_drain(params):
636
    """Set/unset the queue drain flag.
637

    
638
    """
639
    drain_flag = params[0]
640
    return backend.JobQueueSetDrainFlag(drain_flag)
641

    
642

    
643
  # hypervisor ---------------
644

    
645
  @staticmethod
646
  def perspective_hypervisor_validate_params(params):
647
    """Validate the hypervisor parameters.
648

    
649
    """
650
    (hvname, hvparams) = params
651
    return backend.ValidateHVParams(hvname, hvparams)
652

    
653

    
654
def ParseOptions():
655
  """Parse the command line options.
656

    
657
  @return: (options, args) as from OptionParser.parse_args()
658

    
659
  """
660
  parser = OptionParser(description="Ganeti node daemon",
661
                        usage="%prog [-f] [-d]",
662
                        version="%%prog (ganeti) %s" %
663
                        constants.RELEASE_VERSION)
664

    
665
  parser.add_option("-f", "--foreground", dest="fork",
666
                    help="Don't detach from the current terminal",
667
                    default=True, action="store_false")
668
  parser.add_option("-d", "--debug", dest="debug",
669
                    help="Enable some debug messages",
670
                    default=False, action="store_true")
671
  options, args = parser.parse_args()
672
  return options, args
673

    
674

    
675
def EnsureRuntimeEnvironment():
676
  """Ensure our run-time environment is complete.
677

    
678
  Currently this creates directories which could be missing, either
679
  due to directories being on a tmpfs mount, or due to incomplete
680
  packaging.
681

    
682
  """
683
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
684
  dirs.append((constants.LOG_OS_DIR, 0750))
685
  for dir_name, dir_mode in dirs:
686
    if not os.path.exists(dir_name):
687
      try:
688
        os.mkdir(dir_name, dir_mode)
689
      except EnvironmentError, err:
690
        if err.errno != errno.EEXIST:
691
          print ("Node setup wrong, cannot create directory '%s': %s" %
692
                 (dir_name, err))
693
          sys.exit(5)
694
    if not os.path.isdir(dir_name):
695
      print ("Node setup wrong, '%s' is not a directory" % dir_name)
696
      sys.exit(5)
697

    
698

    
699
def main():
700
  """Main function for the node daemon.
701

    
702
  """
703
  global queue_lock
704

    
705
  options, args = ParseOptions()
706
  utils.debug = options.debug
707

    
708
  if options.fork:
709
    utils.CloseFDs()
710

    
711
  for fname in (constants.SSL_CERT_FILE,):
712
    if not os.path.isfile(fname):
713
      print "config %s not there, will not run." % fname
714
      sys.exit(5)
715

    
716
  try:
717
    port = utils.GetNodeDaemonPort()
718
  except errors.ConfigurationError, err:
719
    print "Cluster configuration incomplete: '%s'" % str(err)
720
    sys.exit(5)
721

    
722
  EnsureRuntimeEnvironment()
723

    
724
  # become a daemon
725
  if options.fork:
726
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
727

    
728
  utils.WritePidFile(constants.NODED_PID)
729
  try:
730
    utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
731
                       stderr_logging=not options.fork)
732
    logging.info("ganeti node daemon startup")
733

    
734
    # Read SSL certificate
735
    ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
736
                                    ssl_cert_path=constants.SSL_CERT_FILE)
737

    
738
    # Prepare job queue
739
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
740

    
741
    mainloop = daemon.Mainloop()
742
    server = NodeHttpServer(mainloop, "", port,
743
                            ssl_params=ssl_params, ssl_verify_peer=True)
744
    server.Start()
745
    try:
746
      mainloop.Run()
747
    finally:
748
      server.Stop()
749
  finally:
750
    utils.RemovePidFile(constants.NODED_PID)
751

    
752

    
753
if __name__ == '__main__':
754
  main()