Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 968a7623

History | View | Annotate | Download (20.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46
import ganeti.http.server
47

    
48

    
49
queue_lock = None
50

    
51

    
52
def _RequireJobQueueLock(fn):
53
  """Decorator for job queue manipulating functions.
54

    
55
  """
56
  QUEUE_LOCK_TIMEOUT = 10
57

    
58
  def wrapper(*args, **kwargs):
59
    # Locking in exclusive, blocking mode because there could be several
60
    # children running at the same time. Waiting up to 10 seconds.
61
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62
    try:
63
      return fn(*args, **kwargs)
64
    finally:
65
      queue_lock.Unlock()
66

    
67
  return wrapper
68

    
69

    
70
class NodeHttpServer(http.server.HttpServer):
71
  """The server implementation.
72

    
73
  This class holds all methods exposed over the RPC interface.
74

    
75
  """
76
  def __init__(self, *args, **kwargs):
77
    http.server.HttpServer.__init__(self, *args, **kwargs)
78
    self.noded_pid = os.getpid()
79

    
80
  def HandleRequest(self, req):
81
    """Handle a request.
82

    
83
    """
84
    if req.request_method.upper() != http.HTTP_PUT:
85
      raise http.HttpBadRequest()
86

    
87
    path = req.request_path
88
    if path.startswith("/"):
89
      path = path[1:]
90

    
91
    method = getattr(self, "perspective_%s" % path, None)
92
    if method is None:
93
      raise http.HttpNotFound()
94

    
95
    try:
96
      try:
97
        return method(req.request_body)
98
      except:
99
        logging.exception("Error in RPC call")
100
        raise
101
    except errors.QuitGanetiException, err:
102
      # Tell parent to quit
103
      os.kill(self.noded_pid, signal.SIGTERM)
104

    
105
  # the new block devices  --------------------------
106

    
107
  @staticmethod
108
  def perspective_blockdev_create(params):
109
    """Create a block device.
110

    
111
    """
112
    bdev_s, size, owner, on_primary, info = params
113
    bdev = objects.Disk.FromDict(bdev_s)
114
    if bdev is None:
115
      raise ValueError("can't unserialize data!")
116
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
117

    
118
  @staticmethod
119
  def perspective_blockdev_remove(params):
120
    """Remove a block device.
121

    
122
    """
123
    bdev_s = params[0]
124
    bdev = objects.Disk.FromDict(bdev_s)
125
    return backend.BlockdevRemove(bdev)
126

    
127
  @staticmethod
128
  def perspective_blockdev_rename(params):
129
    """Remove a block device.
130

    
131
    """
132
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133
    return backend.BlockdevRename(devlist)
134

    
135
  @staticmethod
136
  def perspective_blockdev_assemble(params):
137
    """Assemble a block device.
138

    
139
    """
140
    bdev_s, owner, on_primary = params
141
    bdev = objects.Disk.FromDict(bdev_s)
142
    if bdev is None:
143
      raise ValueError("can't unserialize data!")
144
    return backend.BlockdevAssemble(bdev, owner, on_primary)
145

    
146
  @staticmethod
147
  def perspective_blockdev_shutdown(params):
148
    """Shutdown a block device.
149

    
150
    """
151
    bdev_s = params[0]
152
    bdev = objects.Disk.FromDict(bdev_s)
153
    if bdev is None:
154
      raise ValueError("can't unserialize data!")
155
    return backend.BlockdevShutdown(bdev)
156

    
157
  @staticmethod
158
  def perspective_blockdev_addchildren(params):
159
    """Add a child to a mirror device.
160

    
161
    Note: this is only valid for mirror devices. It's the caller's duty
162
    to send a correct disk, otherwise we raise an error.
163

    
164
    """
165
    bdev_s, ndev_s = params
166
    bdev = objects.Disk.FromDict(bdev_s)
167
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168
    if bdev is None or ndevs.count(None) > 0:
169
      raise ValueError("can't unserialize data!")
170
    return backend.BlockdevAddchildren(bdev, ndevs)
171

    
172
  @staticmethod
173
  def perspective_blockdev_removechildren(params):
174
    """Remove a child from a mirror device.
175

    
176
    This is only valid for mirror devices, of course. It's the callers
177
    duty to send a correct disk, otherwise we raise an error.
178

    
179
    """
180
    bdev_s, ndev_s = params
181
    bdev = objects.Disk.FromDict(bdev_s)
182
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183
    if bdev is None or ndevs.count(None) > 0:
184
      raise ValueError("can't unserialize data!")
185
    return backend.BlockdevRemovechildren(bdev, ndevs)
186

    
187
  @staticmethod
188
  def perspective_blockdev_getmirrorstatus(params):
189
    """Return the mirror status for a list of disks.
190

    
191
    """
192
    disks = [objects.Disk.FromDict(dsk_s)
193
            for dsk_s in params]
194
    return backend.BlockdevGetmirrorstatus(disks)
195

    
196
  @staticmethod
197
  def perspective_blockdev_find(params):
198
    """Expose the FindBlockDevice functionality for a disk.
199

    
200
    This will try to find but not activate a disk.
201

    
202
    """
203
    disk = objects.Disk.FromDict(params[0])
204
    return backend.BlockdevFind(disk)
205

    
206
  @staticmethod
207
  def perspective_blockdev_snapshot(params):
208
    """Create a snapshot device.
209

    
210
    Note that this is only valid for LVM disks, if we get passed
211
    something else we raise an exception. The snapshot device can be
212
    remove by calling the generic block device remove call.
213

    
214
    """
215
    cfbd = objects.Disk.FromDict(params[0])
216
    return backend.BlockdevSnapshot(cfbd)
217

    
218
  @staticmethod
219
  def perspective_blockdev_grow(params):
220
    """Grow a stack of devices.
221

    
222
    """
223
    cfbd = objects.Disk.FromDict(params[0])
224
    amount = params[1]
225
    return backend.BlockdevGrow(cfbd, amount)
226

    
227
  @staticmethod
228
  def perspective_blockdev_close(params):
229
    """Closes the given block devices.
230

    
231
    """
232
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233
    return backend.BlockdevClose(params[0], disks)
234

    
235
  @staticmethod
236
  def perspective_blockdev_getsize(params):
237
    """Compute the sizes of the given block devices.
238

    
239
    """
240
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
241
    return backend.BlockdevGetsize(disks)
242

    
243
  # blockdev/drbd specific methods ----------
244

    
245
  @staticmethod
246
  def perspective_drbd_disconnect_net(params):
247
    """Disconnects the network connection of drbd disks.
248

    
249
    Note that this is only valid for drbd disks, so the members of the
250
    disk list must all be drbd devices.
251

    
252
    """
253
    nodes_ip, disks = params
254
    disks = [objects.Disk.FromDict(cf) for cf in disks]
255
    return backend.DrbdDisconnectNet(nodes_ip, disks)
256

    
257
  @staticmethod
258
  def perspective_drbd_attach_net(params):
259
    """Attaches the network connection of drbd disks.
260

    
261
    Note that this is only valid for drbd disks, so the members of the
262
    disk list must all be drbd devices.
263

    
264
    """
265
    nodes_ip, disks, instance_name, multimaster = params
266
    disks = [objects.Disk.FromDict(cf) for cf in disks]
267
    return backend.DrbdAttachNet(nodes_ip, disks,
268
                                     instance_name, multimaster)
269

    
270
  @staticmethod
271
  def perspective_drbd_wait_sync(params):
272
    """Wait until DRBD disks are synched.
273

    
274
    Note that this is only valid for drbd disks, so the members of the
275
    disk list must all be drbd devices.
276

    
277
    """
278
    nodes_ip, disks = params
279
    disks = [objects.Disk.FromDict(cf) for cf in disks]
280
    return backend.DrbdWaitSync(nodes_ip, disks)
281

    
282
  # export/import  --------------------------
283

    
284
  @staticmethod
285
  def perspective_snapshot_export(params):
286
    """Export a given snapshot.
287

    
288
    """
289
    disk = objects.Disk.FromDict(params[0])
290
    dest_node = params[1]
291
    instance = objects.Instance.FromDict(params[2])
292
    cluster_name = params[3]
293
    dev_idx = params[4]
294
    return backend.ExportSnapshot(disk, dest_node, instance,
295
                                  cluster_name, dev_idx)
296

    
297
  @staticmethod
298
  def perspective_finalize_export(params):
299
    """Expose the finalize export functionality.
300

    
301
    """
302
    instance = objects.Instance.FromDict(params[0])
303
    snap_disks = [objects.Disk.FromDict(str_data)
304
                  for str_data in params[1]]
305
    return backend.FinalizeExport(instance, snap_disks)
306

    
307
  @staticmethod
308
  def perspective_export_info(params):
309
    """Query information about an existing export on this node.
310

    
311
    The given path may not contain an export, in which case we return
312
    None.
313

    
314
    """
315
    path = params[0]
316
    einfo = backend.ExportInfo(path)
317
    if einfo is None:
318
      return einfo
319
    return einfo.Dumps()
320

    
321
  @staticmethod
322
  def perspective_export_list(params):
323
    """List the available exports on this node.
324

    
325
    Note that as opposed to export_info, which may query data about an
326
    export in any path, this only queries the standard Ganeti path
327
    (constants.EXPORT_DIR).
328

    
329
    """
330
    return backend.ListExports()
331

    
332
  @staticmethod
333
  def perspective_export_remove(params):
334
    """Remove an export.
335

    
336
    """
337
    export = params[0]
338
    return backend.RemoveExport(export)
339

    
340
  # volume  --------------------------
341

    
342
  @staticmethod
343
  def perspective_volume_list(params):
344
    """Query the list of logical volumes in a given volume group.
345

    
346
    """
347
    vgname = params[0]
348
    return backend.GetVolumeList(vgname)
349

    
350
  @staticmethod
351
  def perspective_vg_list(params):
352
    """Query the list of volume groups.
353

    
354
    """
355
    return backend.ListVolumeGroups()
356

    
357
  # bridge  --------------------------
358

    
359
  @staticmethod
360
  def perspective_bridges_exist(params):
361
    """Check if all bridges given exist on this node.
362

    
363
    """
364
    bridges_list = params[0]
365
    return backend.BridgesExist(bridges_list)
366

    
367
  # instance  --------------------------
368

    
369
  @staticmethod
370
  def perspective_instance_os_add(params):
371
    """Install an OS on a given instance.
372

    
373
    """
374
    inst_s = params[0]
375
    inst = objects.Instance.FromDict(inst_s)
376
    return backend.InstanceOsAdd(inst)
377

    
378
  @staticmethod
379
  def perspective_instance_run_rename(params):
380
    """Runs the OS rename script for an instance.
381

    
382
    """
383
    inst_s, old_name = params
384
    inst = objects.Instance.FromDict(inst_s)
385
    return backend.RunRenameInstance(inst, old_name)
386

    
387
  @staticmethod
388
  def perspective_instance_os_import(params):
389
    """Run the import function of an OS onto a given instance.
390

    
391
    """
392
    inst_s, src_node, src_images, cluster_name = params
393
    inst = objects.Instance.FromDict(inst_s)
394
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
395
                                        cluster_name)
396

    
397
  @staticmethod
398
  def perspective_instance_shutdown(params):
399
    """Shutdown an instance.
400

    
401
    """
402
    instance = objects.Instance.FromDict(params[0])
403
    return backend.InstanceShutdown(instance)
404

    
405
  @staticmethod
406
  def perspective_instance_start(params):
407
    """Start an instance.
408

    
409
    """
410
    instance = objects.Instance.FromDict(params[0])
411
    return backend.StartInstance(instance)
412

    
413
  @staticmethod
414
  def perspective_migration_info(params):
415
    """Gather information about an instance to be migrated.
416

    
417
    """
418
    instance = objects.Instance.FromDict(params[0])
419
    return backend.MigrationInfo(instance)
420

    
421
  @staticmethod
422
  def perspective_accept_instance(params):
423
    """Prepare the node to accept an instance.
424

    
425
    """
426
    instance, info, target = params
427
    instance = objects.Instance.FromDict(instance)
428
    return backend.AcceptInstance(instance, info, target)
429

    
430
  @staticmethod
431
  def perspective_finalize_migration(params):
432
    """Finalize the instance migration.
433

    
434
    """
435
    instance, info, success = params
436
    instance = objects.Instance.FromDict(instance)
437
    return backend.FinalizeMigration(instance, info, success)
438

    
439
  @staticmethod
440
  def perspective_instance_migrate(params):
441
    """Migrates an instance.
442

    
443
    """
444
    instance, target, live = params
445
    instance = objects.Instance.FromDict(instance)
446
    return backend.MigrateInstance(instance, target, live)
447

    
448
  @staticmethod
449
  def perspective_instance_reboot(params):
450
    """Reboot an instance.
451

    
452
    """
453
    instance = objects.Instance.FromDict(params[0])
454
    reboot_type = params[1]
455
    return backend.InstanceReboot(instance, reboot_type)
456

    
457
  @staticmethod
458
  def perspective_instance_info(params):
459
    """Query instance information.
460

    
461
    """
462
    return backend.GetInstanceInfo(params[0], params[1])
463

    
464
  @staticmethod
465
  def perspective_instance_migratable(params):
466
    """Query whether the specified instance can be migrated.
467

    
468
    """
469
    instance = objects.Instance.FromDict(params[0])
470
    return backend.GetInstanceMigratable(instance)
471

    
472
  @staticmethod
473
  def perspective_all_instances_info(params):
474
    """Query information about all instances.
475

    
476
    """
477
    return backend.GetAllInstancesInfo(params[0])
478

    
479
  @staticmethod
480
  def perspective_instance_list(params):
481
    """Query the list of running instances.
482

    
483
    """
484
    return backend.GetInstanceList(params[0])
485

    
486
  # node --------------------------
487

    
488
  @staticmethod
489
  def perspective_node_tcp_ping(params):
490
    """Do a TcpPing on the remote node.
491

    
492
    """
493
    return utils.TcpPing(params[1], params[2], timeout=params[3],
494
                         live_port_needed=params[4], source=params[0])
495

    
496
  @staticmethod
497
  def perspective_node_has_ip_address(params):
498
    """Checks if a node has the given ip address.
499

    
500
    """
501
    return utils.OwnIpAddress(params[0])
502

    
503
  @staticmethod
504
  def perspective_node_info(params):
505
    """Query node information.
506

    
507
    """
508
    vgname, hypervisor_type = params
509
    return backend.GetNodeInfo(vgname, hypervisor_type)
510

    
511
  @staticmethod
512
  def perspective_node_add(params):
513
    """Complete the registration of this node in the cluster.
514

    
515
    """
516
    return backend.AddNode(params[0], params[1], params[2],
517
                           params[3], params[4], params[5])
518

    
519
  @staticmethod
520
  def perspective_node_verify(params):
521
    """Run a verify sequence on this node.
522

    
523
    """
524
    return backend.VerifyNode(params[0], params[1])
525

    
526
  @staticmethod
527
  def perspective_node_start_master(params):
528
    """Promote this node to master status.
529

    
530
    """
531
    return backend.StartMaster(params[0], params[1])
532

    
533
  @staticmethod
534
  def perspective_node_stop_master(params):
535
    """Demote this node from master status.
536

    
537
    """
538
    return backend.StopMaster(params[0])
539

    
540
  @staticmethod
541
  def perspective_node_leave_cluster(params):
542
    """Cleanup after leaving a cluster.
543

    
544
    """
545
    return backend.LeaveCluster()
546

    
547
  @staticmethod
548
  def perspective_node_volumes(params):
549
    """Query the list of all logical volume groups.
550

    
551
    """
552
    return backend.NodeVolumes()
553

    
554
  @staticmethod
555
  def perspective_node_demote_from_mc(params):
556
    """Demote a node from the master candidate role.
557

    
558
    """
559
    return backend.DemoteFromMC()
560

    
561

    
562
  # cluster --------------------------
563

    
564
  @staticmethod
565
  def perspective_version(params):
566
    """Query version information.
567

    
568
    """
569
    return constants.PROTOCOL_VERSION
570

    
571
  @staticmethod
572
  def perspective_upload_file(params):
573
    """Upload a file.
574

    
575
    Note that the backend implementation imposes strict rules on which
576
    files are accepted.
577

    
578
    """
579
    return backend.UploadFile(*params)
580

    
581
  @staticmethod
582
  def perspective_master_info(params):
583
    """Query master information.
584

    
585
    """
586
    return backend.GetMasterInfo()
587

    
588
  @staticmethod
589
  def perspective_write_ssconf_files(params):
590
    """Write ssconf files.
591

    
592
    """
593
    (values,) = params
594
    return backend.WriteSsconfFiles(values)
595

    
596
  # os -----------------------
597

    
598
  @staticmethod
599
  def perspective_os_diagnose(params):
600
    """Query detailed information about existing OSes.
601

    
602
    """
603
    return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
604

    
605
  @staticmethod
606
  def perspective_os_get(params):
607
    """Query information about a given OS.
608

    
609
    """
610
    name = params[0]
611
    try:
612
      os_obj = backend.OSFromDisk(name)
613
    except errors.InvalidOS, err:
614
      os_obj = objects.OS.FromInvalidOS(err)
615
    return os_obj.ToDict()
616

    
617
  # hooks -----------------------
618

    
619
  @staticmethod
620
  def perspective_hooks_runner(params):
621
    """Run hook scripts.
622

    
623
    """
624
    hpath, phase, env = params
625
    hr = backend.HooksRunner()
626
    return hr.RunHooks(hpath, phase, env)
627

    
628
  # iallocator -----------------
629

    
630
  @staticmethod
631
  def perspective_iallocator_runner(params):
632
    """Run an iallocator script.
633

    
634
    """
635
    name, idata = params
636
    iar = backend.IAllocatorRunner()
637
    return iar.Run(name, idata)
638

    
639
  # test -----------------------
640

    
641
  @staticmethod
642
  def perspective_test_delay(params):
643
    """Run test delay.
644

    
645
    """
646
    duration = params[0]
647
    return utils.TestDelay(duration)
648

    
649
  # file storage ---------------
650

    
651
  @staticmethod
652
  def perspective_file_storage_dir_create(params):
653
    """Create the file storage directory.
654

    
655
    """
656
    file_storage_dir = params[0]
657
    return backend.CreateFileStorageDir(file_storage_dir)
658

    
659
  @staticmethod
660
  def perspective_file_storage_dir_remove(params):
661
    """Remove the file storage directory.
662

    
663
    """
664
    file_storage_dir = params[0]
665
    return backend.RemoveFileStorageDir(file_storage_dir)
666

    
667
  @staticmethod
668
  def perspective_file_storage_dir_rename(params):
669
    """Rename the file storage directory.
670

    
671
    """
672
    old_file_storage_dir = params[0]
673
    new_file_storage_dir = params[1]
674
    return backend.RenameFileStorageDir(old_file_storage_dir,
675
                                        new_file_storage_dir)
676

    
677
  # jobs ------------------------
678

    
679
  @staticmethod
680
  @_RequireJobQueueLock
681
  def perspective_jobqueue_update(params):
682
    """Update job queue.
683

    
684
    """
685
    (file_name, content) = params
686
    return backend.JobQueueUpdate(file_name, content)
687

    
688
  @staticmethod
689
  @_RequireJobQueueLock
690
  def perspective_jobqueue_purge(params):
691
    """Purge job queue.
692

    
693
    """
694
    return backend.JobQueuePurge()
695

    
696
  @staticmethod
697
  @_RequireJobQueueLock
698
  def perspective_jobqueue_rename(params):
699
    """Rename a job queue file.
700

    
701
    """
702
    # TODO: What if a file fails to rename?
703
    return [backend.JobQueueRename(old, new) for old, new in params]
704

    
705
  @staticmethod
706
  def perspective_jobqueue_set_drain(params):
707
    """Set/unset the queue drain flag.
708

    
709
    """
710
    drain_flag = params[0]
711
    return backend.JobQueueSetDrainFlag(drain_flag)
712

    
713

    
714
  # hypervisor ---------------
715

    
716
  @staticmethod
717
  def perspective_hypervisor_validate_params(params):
718
    """Validate the hypervisor parameters.
719

    
720
    """
721
    (hvname, hvparams) = params
722
    return backend.ValidateHVParams(hvname, hvparams)
723

    
724

    
725
def ParseOptions():
726
  """Parse the command line options.
727

    
728
  @return: (options, args) as from OptionParser.parse_args()
729

    
730
  """
731
  parser = OptionParser(description="Ganeti node daemon",
732
                        usage="%prog [-f] [-d] [-b ADDRESS]",
733
                        version="%%prog (ganeti) %s" %
734
                        constants.RELEASE_VERSION)
735

    
736
  parser.add_option("-f", "--foreground", dest="fork",
737
                    help="Don't detach from the current terminal",
738
                    default=True, action="store_false")
739
  parser.add_option("-d", "--debug", dest="debug",
740
                    help="Enable some debug messages",
741
                    default=False, action="store_true")
742
  parser.add_option("-b", "--bind", dest="bind_address",
743
                    help="Bind address",
744
                    default="", metavar="ADDRESS")
745

    
746
  options, args = parser.parse_args()
747
  return options, args
748

    
749

    
750
def main():
751
  """Main function for the node daemon.
752

    
753
  """
754
  global queue_lock
755

    
756
  options, args = ParseOptions()
757
  utils.debug = options.debug
758

    
759
  if options.fork:
760
    utils.CloseFDs()
761

    
762
  for fname in (constants.SSL_CERT_FILE,):
763
    if not os.path.isfile(fname):
764
      print "config %s not there, will not run." % fname
765
      sys.exit(5)
766

    
767
  try:
768
    port = utils.GetNodeDaemonPort()
769
  except errors.ConfigurationError, err:
770
    print "Cluster configuration incomplete: '%s'" % str(err)
771
    sys.exit(5)
772

    
773
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
774
  dirs.append((constants.LOG_OS_DIR, 0750))
775
  dirs.append((constants.LOCK_DIR, 1777))
776
  utils.EnsureDirs(dirs)
777

    
778
  # become a daemon
779
  if options.fork:
780
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
781

    
782
  utils.WritePidFile(constants.NODED_PID)
783
  try:
784
    utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
785
                       stderr_logging=not options.fork)
786
    logging.info("ganeti node daemon startup")
787

    
788
    # Read SSL certificate
789
    ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
790
                                    ssl_cert_path=constants.SSL_CERT_FILE)
791

    
792
    # Prepare job queue
793
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
794

    
795
    mainloop = daemon.Mainloop()
796
    server = NodeHttpServer(mainloop, options.bind_address, port,
797
                            ssl_params=ssl_params, ssl_verify_peer=True)
798
    server.Start()
799
    try:
800
      mainloop.Run()
801
    finally:
802
      server.Stop()
803
  finally:
804
    utils.RemovePidFile(constants.NODED_PID)
805

    
806

    
807
if __name__ == '__main__':
808
  main()