Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 90b54c26

History | View | Annotate | Download (21.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46
import ganeti.http.server
47

    
48

    
49
queue_lock = None
50

    
51

    
52
def _RequireJobQueueLock(fn):
53
  """Decorator for job queue manipulating functions.
54

    
55
  """
56
  QUEUE_LOCK_TIMEOUT = 10
57

    
58
  def wrapper(*args, **kwargs):
59
    # Locking in exclusive, blocking mode because there could be several
60
    # children running at the same time. Waiting up to 10 seconds.
61
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62
    try:
63
      return fn(*args, **kwargs)
64
    finally:
65
      queue_lock.Unlock()
66

    
67
  return wrapper
68

    
69

    
70
class NodeHttpServer(http.server.HttpServer):
71
  """The server implementation.
72

    
73
  This class holds all methods exposed over the RPC interface.
74

    
75
  """
76
  def __init__(self, *args, **kwargs):
77
    http.server.HttpServer.__init__(self, *args, **kwargs)
78
    self.noded_pid = os.getpid()
79

    
80
  def HandleRequest(self, req):
81
    """Handle a request.
82

    
83
    """
84
    if req.request_method.upper() != http.HTTP_PUT:
85
      raise http.HttpBadRequest()
86

    
87
    path = req.request_path
88
    if path.startswith("/"):
89
      path = path[1:]
90

    
91
    method = getattr(self, "perspective_%s" % path, None)
92
    if method is None:
93
      raise http.HttpNotFound()
94

    
95
    try:
96
      return method(req.request_body)
97
    except backend.RPCFail, err:
98
      # our custom failure exception; str(err) works fine if the
99
      # exception was constructed with a single argument, and in
100
      # this case, err.message == err.args[0] == str(err)
101
      return (False, str(err))
102
    except errors.QuitGanetiException, err:
103
      # Tell parent to quit
104
      logging.info("Shutting down the node daemon, arguments: %s",
105
                   str(err.args))
106
      os.kill(self.noded_pid, signal.SIGTERM)
107
      # And return the error's arguments, which must be already in
108
      # correct tuple format
109
      return err.args
110
    except:
111
      logging.exception("Error in RPC call")
112
      raise
113

    
114
  # the new block devices  --------------------------
115

    
116
  @staticmethod
117
  def perspective_blockdev_create(params):
118
    """Create a block device.
119

    
120
    """
121
    bdev_s, size, owner, on_primary, info = params
122
    bdev = objects.Disk.FromDict(bdev_s)
123
    if bdev is None:
124
      raise ValueError("can't unserialize data!")
125
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
126

    
127
  @staticmethod
128
  def perspective_blockdev_remove(params):
129
    """Remove a block device.
130

    
131
    """
132
    bdev_s = params[0]
133
    bdev = objects.Disk.FromDict(bdev_s)
134
    return backend.BlockdevRemove(bdev)
135

    
136
  @staticmethod
137
  def perspective_blockdev_rename(params):
138
    """Remove a block device.
139

    
140
    """
141
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
142
    return backend.BlockdevRename(devlist)
143

    
144
  @staticmethod
145
  def perspective_blockdev_assemble(params):
146
    """Assemble a block device.
147

    
148
    """
149
    bdev_s, owner, on_primary = params
150
    bdev = objects.Disk.FromDict(bdev_s)
151
    if bdev is None:
152
      raise ValueError("can't unserialize data!")
153
    return backend.BlockdevAssemble(bdev, owner, on_primary)
154

    
155
  @staticmethod
156
  def perspective_blockdev_shutdown(params):
157
    """Shutdown a block device.
158

    
159
    """
160
    bdev_s = params[0]
161
    bdev = objects.Disk.FromDict(bdev_s)
162
    if bdev is None:
163
      raise ValueError("can't unserialize data!")
164
    return backend.BlockdevShutdown(bdev)
165

    
166
  @staticmethod
167
  def perspective_blockdev_addchildren(params):
168
    """Add a child to a mirror device.
169

    
170
    Note: this is only valid for mirror devices. It's the caller's duty
171
    to send a correct disk, otherwise we raise an error.
172

    
173
    """
174
    bdev_s, ndev_s = params
175
    bdev = objects.Disk.FromDict(bdev_s)
176
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177
    if bdev is None or ndevs.count(None) > 0:
178
      raise ValueError("can't unserialize data!")
179
    return backend.BlockdevAddchildren(bdev, ndevs)
180

    
181
  @staticmethod
182
  def perspective_blockdev_removechildren(params):
183
    """Remove a child from a mirror device.
184

    
185
    This is only valid for mirror devices, of course. It's the callers
186
    duty to send a correct disk, otherwise we raise an error.
187

    
188
    """
189
    bdev_s, ndev_s = params
190
    bdev = objects.Disk.FromDict(bdev_s)
191
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
192
    if bdev is None or ndevs.count(None) > 0:
193
      raise ValueError("can't unserialize data!")
194
    return backend.BlockdevRemovechildren(bdev, ndevs)
195

    
196
  @staticmethod
197
  def perspective_blockdev_getmirrorstatus(params):
198
    """Return the mirror status for a list of disks.
199

    
200
    """
201
    disks = [objects.Disk.FromDict(dsk_s)
202
            for dsk_s in params]
203
    return backend.BlockdevGetmirrorstatus(disks)
204

    
205
  @staticmethod
206
  def perspective_blockdev_find(params):
207
    """Expose the FindBlockDevice functionality for a disk.
208

    
209
    This will try to find but not activate a disk.
210

    
211
    """
212
    disk = objects.Disk.FromDict(params[0])
213
    return backend.BlockdevFind(disk)
214

    
215
  @staticmethod
216
  def perspective_blockdev_snapshot(params):
217
    """Create a snapshot device.
218

    
219
    Note that this is only valid for LVM disks, if we get passed
220
    something else we raise an exception. The snapshot device can be
221
    remove by calling the generic block device remove call.
222

    
223
    """
224
    cfbd = objects.Disk.FromDict(params[0])
225
    return backend.BlockdevSnapshot(cfbd)
226

    
227
  @staticmethod
228
  def perspective_blockdev_grow(params):
229
    """Grow a stack of devices.
230

    
231
    """
232
    cfbd = objects.Disk.FromDict(params[0])
233
    amount = params[1]
234
    return backend.BlockdevGrow(cfbd, amount)
235

    
236
  @staticmethod
237
  def perspective_blockdev_close(params):
238
    """Closes the given block devices.
239

    
240
    """
241
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
242
    return backend.BlockdevClose(params[0], disks)
243

    
244
  # blockdev/drbd specific methods ----------
245

    
246
  @staticmethod
247
  def perspective_drbd_disconnect_net(params):
248
    """Disconnects the network connection of drbd disks.
249

    
250
    Note that this is only valid for drbd disks, so the members of the
251
    disk list must all be drbd devices.
252

    
253
    """
254
    nodes_ip, disks = params
255
    disks = [objects.Disk.FromDict(cf) for cf in disks]
256
    return backend.DrbdDisconnectNet(nodes_ip, disks)
257

    
258
  @staticmethod
259
  def perspective_drbd_attach_net(params):
260
    """Attaches the network connection of drbd disks.
261

    
262
    Note that this is only valid for drbd disks, so the members of the
263
    disk list must all be drbd devices.
264

    
265
    """
266
    nodes_ip, disks, instance_name, multimaster = params
267
    disks = [objects.Disk.FromDict(cf) for cf in disks]
268
    return backend.DrbdAttachNet(nodes_ip, disks,
269
                                     instance_name, multimaster)
270

    
271
  @staticmethod
272
  def perspective_drbd_wait_sync(params):
273
    """Wait until DRBD disks are synched.
274

    
275
    Note that this is only valid for drbd disks, so the members of the
276
    disk list must all be drbd devices.
277

    
278
    """
279
    nodes_ip, disks = params
280
    disks = [objects.Disk.FromDict(cf) for cf in disks]
281
    return backend.DrbdWaitSync(nodes_ip, disks)
282

    
283
  # export/import  --------------------------
284

    
285
  @staticmethod
286
  def perspective_snapshot_export(params):
287
    """Export a given snapshot.
288

    
289
    """
290
    disk = objects.Disk.FromDict(params[0])
291
    dest_node = params[1]
292
    instance = objects.Instance.FromDict(params[2])
293
    cluster_name = params[3]
294
    dev_idx = params[4]
295
    return backend.ExportSnapshot(disk, dest_node, instance,
296
                                  cluster_name, dev_idx)
297

    
298
  @staticmethod
299
  def perspective_finalize_export(params):
300
    """Expose the finalize export functionality.
301

    
302
    """
303
    instance = objects.Instance.FromDict(params[0])
304
    snap_disks = [objects.Disk.FromDict(str_data)
305
                  for str_data in params[1]]
306
    return backend.FinalizeExport(instance, snap_disks)
307

    
308
  @staticmethod
309
  def perspective_export_info(params):
310
    """Query information about an existing export on this node.
311

    
312
    The given path may not contain an export, in which case we return
313
    None.
314

    
315
    """
316
    path = params[0]
317
    return backend.ExportInfo(path)
318

    
319
  @staticmethod
320
  def perspective_export_list(params):
321
    """List the available exports on this node.
322

    
323
    Note that as opposed to export_info, which may query data about an
324
    export in any path, this only queries the standard Ganeti path
325
    (constants.EXPORT_DIR).
326

    
327
    """
328
    return backend.ListExports()
329

    
330
  @staticmethod
331
  def perspective_export_remove(params):
332
    """Remove an export.
333

    
334
    """
335
    export = params[0]
336
    return backend.RemoveExport(export)
337

    
338
  # volume  --------------------------
339

    
340
  @staticmethod
341
  def perspective_volume_list(params):
342
    """Query the list of logical volumes in a given volume group.
343

    
344
    """
345
    vgname = params[0]
346
    return True, backend.GetVolumeList(vgname)
347

    
348
  @staticmethod
349
  def perspective_vg_list(params):
350
    """Query the list of volume groups.
351

    
352
    """
353
    return backend.ListVolumeGroups()
354

    
355
  # bridge  --------------------------
356

    
357
  @staticmethod
358
  def perspective_bridges_exist(params):
359
    """Check if all bridges given exist on this node.
360

    
361
    """
362
    bridges_list = params[0]
363
    return backend.BridgesExist(bridges_list)
364

    
365
  # instance  --------------------------
366

    
367
  @staticmethod
368
  def perspective_instance_os_add(params):
369
    """Install an OS on a given instance.
370

    
371
    """
372
    inst_s = params[0]
373
    inst = objects.Instance.FromDict(inst_s)
374
    reinstall = params[1]
375
    return backend.InstanceOsAdd(inst, reinstall)
376

    
377
  @staticmethod
378
  def perspective_instance_run_rename(params):
379
    """Runs the OS rename script for an instance.
380

    
381
    """
382
    inst_s, old_name = params
383
    inst = objects.Instance.FromDict(inst_s)
384
    return backend.RunRenameInstance(inst, old_name)
385

    
386
  @staticmethod
387
  def perspective_instance_os_import(params):
388
    """Run the import function of an OS onto a given instance.
389

    
390
    """
391
    inst_s, src_node, src_images, cluster_name = params
392
    inst = objects.Instance.FromDict(inst_s)
393
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
394
                                        cluster_name)
395

    
396
  @staticmethod
397
  def perspective_instance_shutdown(params):
398
    """Shutdown an instance.
399

    
400
    """
401
    instance = objects.Instance.FromDict(params[0])
402
    return backend.InstanceShutdown(instance)
403

    
404
  @staticmethod
405
  def perspective_instance_start(params):
406
    """Start an instance.
407

    
408
    """
409
    instance = objects.Instance.FromDict(params[0])
410
    return backend.StartInstance(instance)
411

    
412
  @staticmethod
413
  def perspective_migration_info(params):
414
    """Gather information about an instance to be migrated.
415

    
416
    """
417
    instance = objects.Instance.FromDict(params[0])
418
    return backend.MigrationInfo(instance)
419

    
420
  @staticmethod
421
  def perspective_accept_instance(params):
422
    """Prepare the node to accept an instance.
423

    
424
    """
425
    instance, info, target = params
426
    instance = objects.Instance.FromDict(instance)
427
    return backend.AcceptInstance(instance, info, target)
428

    
429
  @staticmethod
430
  def perspective_finalize_migration(params):
431
    """Finalize the instance migration.
432

    
433
    """
434
    instance, info, success = params
435
    instance = objects.Instance.FromDict(instance)
436
    return backend.FinalizeMigration(instance, info, success)
437

    
438
  @staticmethod
439
  def perspective_instance_migrate(params):
440
    """Migrates an instance.
441

    
442
    """
443
    instance, target, live = params
444
    instance = objects.Instance.FromDict(instance)
445
    return backend.MigrateInstance(instance, target, live)
446

    
447
  @staticmethod
448
  def perspective_instance_reboot(params):
449
    """Reboot an instance.
450

    
451
    """
452
    instance = objects.Instance.FromDict(params[0])
453
    reboot_type = params[1]
454
    return backend.InstanceReboot(instance, reboot_type)
455

    
456
  @staticmethod
457
  def perspective_instance_info(params):
458
    """Query instance information.
459

    
460
    """
461
    return backend.GetInstanceInfo(params[0], params[1])
462

    
463
  @staticmethod
464
  def perspective_instance_migratable(params):
465
    """Query whether the specified instance can be migrated.
466

    
467
    """
468
    instance = objects.Instance.FromDict(params[0])
469
    return backend.GetInstanceMigratable(instance)
470

    
471
  @staticmethod
472
  def perspective_all_instances_info(params):
473
    """Query information about all instances.
474

    
475
    """
476
    return backend.GetAllInstancesInfo(params[0])
477

    
478
  @staticmethod
479
  def perspective_instance_list(params):
480
    """Query the list of running instances.
481

    
482
    """
483
    return True, backend.GetInstanceList(params[0])
484

    
485
  # node --------------------------
486

    
487
  @staticmethod
488
  def perspective_node_tcp_ping(params):
489
    """Do a TcpPing on the remote node.
490

    
491
    """
492
    return utils.TcpPing(params[1], params[2], timeout=params[3],
493
                         live_port_needed=params[4], source=params[0])
494

    
495
  @staticmethod
496
  def perspective_node_has_ip_address(params):
497
    """Checks if a node has the given ip address.
498

    
499
    """
500
    return True, utils.OwnIpAddress(params[0])
501

    
502
  @staticmethod
503
  def perspective_node_info(params):
504
    """Query node information.
505

    
506
    """
507
    vgname, hypervisor_type = params
508
    return backend.GetNodeInfo(vgname, hypervisor_type)
509

    
510
  @staticmethod
511
  def perspective_node_add(params):
512
    """Complete the registration of this node in the cluster.
513

    
514
    """
515
    return backend.AddNode(params[0], params[1], params[2],
516
                           params[3], params[4], params[5])
517

    
518
  @staticmethod
519
  def perspective_node_verify(params):
520
    """Run a verify sequence on this node.
521

    
522
    """
523
    return backend.VerifyNode(params[0], params[1])
524

    
525
  @staticmethod
526
  def perspective_node_start_master(params):
527
    """Promote this node to master status.
528

    
529
    """
530
    return backend.StartMaster(params[0])
531

    
532
  @staticmethod
533
  def perspective_node_stop_master(params):
534
    """Demote this node from master status.
535

    
536
    """
537
    return backend.StopMaster(params[0])
538

    
539
  @staticmethod
540
  def perspective_node_leave_cluster(params):
541
    """Cleanup after leaving a cluster.
542

    
543
    """
544
    return backend.LeaveCluster()
545

    
546
  @staticmethod
547
  def perspective_node_volumes(params):
548
    """Query the list of all logical volume groups.
549

    
550
    """
551
    return backend.NodeVolumes()
552

    
553
  @staticmethod
554
  def perspective_node_demote_from_mc(params):
555
    """Demote a node from the master candidate role.
556

    
557
    """
558
    return backend.DemoteFromMC()
559

    
560

    
561
  @staticmethod
562
  def perspective_node_powercycle(params):
563
    """Tries to powercycle the nod.
564

    
565
    """
566
    hypervisor_type = params[0]
567
    return backend.PowercycleNode(hypervisor_type)
568

    
569

    
570
  # cluster --------------------------
571

    
572
  @staticmethod
573
  def perspective_version(params):
574
    """Query version information.
575

    
576
    """
577
    return True, constants.PROTOCOL_VERSION
578

    
579
  @staticmethod
580
  def perspective_upload_file(params):
581
    """Upload a file.
582

    
583
    Note that the backend implementation imposes strict rules on which
584
    files are accepted.
585

    
586
    """
587
    return backend.UploadFile(*params)
588

    
589
  @staticmethod
590
  def perspective_master_info(params):
591
    """Query master information.
592

    
593
    """
594
    return backend.GetMasterInfo()
595

    
596
  @staticmethod
597
  def perspective_write_ssconf_files(params):
598
    """Write ssconf files.
599

    
600
    """
601
    (values,) = params
602
    return backend.WriteSsconfFiles(values)
603

    
604
  # os -----------------------
605

    
606
  @staticmethod
607
  def perspective_os_diagnose(params):
608
    """Query detailed information about existing OSes.
609

    
610
    """
611
    return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
612

    
613
  @staticmethod
614
  def perspective_os_get(params):
615
    """Query information about a given OS.
616

    
617
    """
618
    name = params[0]
619
    try:
620
      os_obj = backend.OSFromDisk(name)
621
    except errors.InvalidOS, err:
622
      os_obj = objects.OS.FromInvalidOS(err)
623
    return os_obj.ToDict()
624

    
625
  # hooks -----------------------
626

    
627
  @staticmethod
628
  def perspective_hooks_runner(params):
629
    """Run hook scripts.
630

    
631
    """
632
    hpath, phase, env = params
633
    hr = backend.HooksRunner()
634
    return hr.RunHooks(hpath, phase, env)
635

    
636
  # iallocator -----------------
637

    
638
  @staticmethod
639
  def perspective_iallocator_runner(params):
640
    """Run an iallocator script.
641

    
642
    """
643
    name, idata = params
644
    iar = backend.IAllocatorRunner()
645
    return iar.Run(name, idata)
646

    
647
  # test -----------------------
648

    
649
  @staticmethod
650
  def perspective_test_delay(params):
651
    """Run test delay.
652

    
653
    """
654
    duration = params[0]
655
    return utils.TestDelay(duration)
656

    
657
  # file storage ---------------
658

    
659
  @staticmethod
660
  def perspective_file_storage_dir_create(params):
661
    """Create the file storage directory.
662

    
663
    """
664
    file_storage_dir = params[0]
665
    return backend.CreateFileStorageDir(file_storage_dir)
666

    
667
  @staticmethod
668
  def perspective_file_storage_dir_remove(params):
669
    """Remove the file storage directory.
670

    
671
    """
672
    file_storage_dir = params[0]
673
    return backend.RemoveFileStorageDir(file_storage_dir)
674

    
675
  @staticmethod
676
  def perspective_file_storage_dir_rename(params):
677
    """Rename the file storage directory.
678

    
679
    """
680
    old_file_storage_dir = params[0]
681
    new_file_storage_dir = params[1]
682
    return backend.RenameFileStorageDir(old_file_storage_dir,
683
                                        new_file_storage_dir)
684

    
685
  # jobs ------------------------
686

    
687
  @staticmethod
688
  @_RequireJobQueueLock
689
  def perspective_jobqueue_update(params):
690
    """Update job queue.
691

    
692
    """
693
    (file_name, content) = params
694
    return backend.JobQueueUpdate(file_name, content)
695

    
696
  @staticmethod
697
  @_RequireJobQueueLock
698
  def perspective_jobqueue_purge(params):
699
    """Purge job queue.
700

    
701
    """
702
    return backend.JobQueuePurge()
703

    
704
  @staticmethod
705
  @_RequireJobQueueLock
706
  def perspective_jobqueue_rename(params):
707
    """Rename a job queue file.
708

    
709
    """
710
    # TODO: What if a file fails to rename?
711
    return [backend.JobQueueRename(old, new) for old, new in params]
712

    
713
  @staticmethod
714
  def perspective_jobqueue_set_drain(params):
715
    """Set/unset the queue drain flag.
716

    
717
    """
718
    drain_flag = params[0]
719
    return backend.JobQueueSetDrainFlag(drain_flag)
720

    
721

    
722
  # hypervisor ---------------
723

    
724
  @staticmethod
725
  def perspective_hypervisor_validate_params(params):
726
    """Validate the hypervisor parameters.
727

    
728
    """
729
    (hvname, hvparams) = params
730
    return backend.ValidateHVParams(hvname, hvparams)
731

    
732

    
733
def ParseOptions():
734
  """Parse the command line options.
735

    
736
  @return: (options, args) as from OptionParser.parse_args()
737

    
738
  """
739
  parser = OptionParser(description="Ganeti node daemon",
740
                        usage="%prog [-f] [-d] [-b ADDRESS]",
741
                        version="%%prog (ganeti) %s" %
742
                        constants.RELEASE_VERSION)
743

    
744
  parser.add_option("-f", "--foreground", dest="fork",
745
                    help="Don't detach from the current terminal",
746
                    default=True, action="store_false")
747
  parser.add_option("-d", "--debug", dest="debug",
748
                    help="Enable some debug messages",
749
                    default=False, action="store_true")
750
  parser.add_option("-b", "--bind", dest="bind_address",
751
                    help="Bind address",
752
                    default="", metavar="ADDRESS")
753

    
754
  options, args = parser.parse_args()
755
  return options, args
756

    
757

    
758
def main():
759
  """Main function for the node daemon.
760

    
761
  """
762
  global queue_lock
763

    
764
  options, args = ParseOptions()
765
  utils.debug = options.debug
766

    
767
  if options.fork:
768
    utils.CloseFDs()
769

    
770
  for fname in (constants.SSL_CERT_FILE,):
771
    if not os.path.isfile(fname):
772
      print "config %s not there, will not run." % fname
773
      sys.exit(5)
774

    
775
  try:
776
    port = utils.GetNodeDaemonPort()
777
  except errors.ConfigurationError, err:
778
    print "Cluster configuration incomplete: '%s'" % str(err)
779
    sys.exit(5)
780

    
781
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
782
  dirs.append((constants.LOG_OS_DIR, 0750))
783
  dirs.append((constants.LOCK_DIR, 1777))
784
  utils.EnsureDirs(dirs)
785

    
786
  # become a daemon
787
  if options.fork:
788
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
789

    
790
  utils.WritePidFile(constants.NODED_PID)
791
  try:
792
    utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
793
                       stderr_logging=not options.fork)
794
    logging.info("ganeti node daemon startup")
795

    
796
    # Read SSL certificate
797
    ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
798
                                    ssl_cert_path=constants.SSL_CERT_FILE)
799

    
800
    # Prepare job queue
801
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
802

    
803
    mainloop = daemon.Mainloop()
804
    server = NodeHttpServer(mainloop, options.bind_address, port,
805
                            ssl_params=ssl_params, ssl_verify_peer=True)
806
    server.Start()
807
    try:
808
      mainloop.Run()
809
    finally:
810
      server.Stop()
811
  finally:
812
    utils.RemovePidFile(constants.NODED_PID)
813

    
814

    
815
if __name__ == '__main__':
816
  main()