Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ c2fc8250

History | View | Annotate | Download (21 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46
import ganeti.http.server
47

    
48

    
49
queue_lock = None
50

    
51

    
52
def _RequireJobQueueLock(fn):
53
  """Decorator for job queue manipulating functions.
54

    
55
  """
56
  QUEUE_LOCK_TIMEOUT = 10
57

    
58
  def wrapper(*args, **kwargs):
59
    # Locking in exclusive, blocking mode because there could be several
60
    # children running at the same time. Waiting up to 10 seconds.
61
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62
    try:
63
      return fn(*args, **kwargs)
64
    finally:
65
      queue_lock.Unlock()
66

    
67
  return wrapper
68

    
69

    
70
class NodeHttpServer(http.server.HttpServer):
71
  """The server implementation.
72

    
73
  This class holds all methods exposed over the RPC interface.
74

    
75
  """
76
  def __init__(self, *args, **kwargs):
77
    http.server.HttpServer.__init__(self, *args, **kwargs)
78
    self.noded_pid = os.getpid()
79

    
80
  def HandleRequest(self, req):
81
    """Handle a request.
82

    
83
    """
84
    if req.request_method.upper() != http.HTTP_PUT:
85
      raise http.HttpBadRequest()
86

    
87
    path = req.request_path
88
    if path.startswith("/"):
89
      path = path[1:]
90

    
91
    method = getattr(self, "perspective_%s" % path, None)
92
    if method is None:
93
      raise http.HttpNotFound()
94

    
95
    try:
96
      try:
97
        return method(req.request_body)
98
      except backend.RPCFail, err:
99
        # our custom failure exception; str(err) works fine if the
100
        # exception was constructed with a single argument, and in
101
        # this case, err.message == err.args[0] == str(err)
102
        return (False, str(err))
103
      except:
104
        logging.exception("Error in RPC call")
105
        raise
106
    except errors.QuitGanetiException, err:
107
      # Tell parent to quit
108
      os.kill(self.noded_pid, signal.SIGTERM)
109

    
110
  # the new block devices  --------------------------
111

    
112
  @staticmethod
113
  def perspective_blockdev_create(params):
114
    """Create a block device.
115

    
116
    """
117
    bdev_s, size, owner, on_primary, info = params
118
    bdev = objects.Disk.FromDict(bdev_s)
119
    if bdev is None:
120
      raise ValueError("can't unserialize data!")
121
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
122

    
123
  @staticmethod
124
  def perspective_blockdev_remove(params):
125
    """Remove a block device.
126

    
127
    """
128
    bdev_s = params[0]
129
    bdev = objects.Disk.FromDict(bdev_s)
130
    return backend.BlockdevRemove(bdev)
131

    
132
  @staticmethod
133
  def perspective_blockdev_rename(params):
134
    """Remove a block device.
135

    
136
    """
137
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
138
    return backend.BlockdevRename(devlist)
139

    
140
  @staticmethod
141
  def perspective_blockdev_assemble(params):
142
    """Assemble a block device.
143

    
144
    """
145
    bdev_s, owner, on_primary = params
146
    bdev = objects.Disk.FromDict(bdev_s)
147
    if bdev is None:
148
      raise ValueError("can't unserialize data!")
149
    return backend.BlockdevAssemble(bdev, owner, on_primary)
150

    
151
  @staticmethod
152
  def perspective_blockdev_shutdown(params):
153
    """Shutdown a block device.
154

    
155
    """
156
    bdev_s = params[0]
157
    bdev = objects.Disk.FromDict(bdev_s)
158
    if bdev is None:
159
      raise ValueError("can't unserialize data!")
160
    return backend.BlockdevShutdown(bdev)
161

    
162
  @staticmethod
163
  def perspective_blockdev_addchildren(params):
164
    """Add a child to a mirror device.
165

    
166
    Note: this is only valid for mirror devices. It's the caller's duty
167
    to send a correct disk, otherwise we raise an error.
168

    
169
    """
170
    bdev_s, ndev_s = params
171
    bdev = objects.Disk.FromDict(bdev_s)
172
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
173
    if bdev is None or ndevs.count(None) > 0:
174
      raise ValueError("can't unserialize data!")
175
    return backend.BlockdevAddchildren(bdev, ndevs)
176

    
177
  @staticmethod
178
  def perspective_blockdev_removechildren(params):
179
    """Remove a child from a mirror device.
180

    
181
    This is only valid for mirror devices, of course. It's the callers
182
    duty to send a correct disk, otherwise we raise an error.
183

    
184
    """
185
    bdev_s, ndev_s = params
186
    bdev = objects.Disk.FromDict(bdev_s)
187
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
188
    if bdev is None or ndevs.count(None) > 0:
189
      raise ValueError("can't unserialize data!")
190
    return backend.BlockdevRemovechildren(bdev, ndevs)
191

    
192
  @staticmethod
193
  def perspective_blockdev_getmirrorstatus(params):
194
    """Return the mirror status for a list of disks.
195

    
196
    """
197
    disks = [objects.Disk.FromDict(dsk_s)
198
            for dsk_s in params]
199
    return backend.BlockdevGetmirrorstatus(disks)
200

    
201
  @staticmethod
202
  def perspective_blockdev_find(params):
203
    """Expose the FindBlockDevice functionality for a disk.
204

    
205
    This will try to find but not activate a disk.
206

    
207
    """
208
    disk = objects.Disk.FromDict(params[0])
209
    return backend.BlockdevFind(disk)
210

    
211
  @staticmethod
212
  def perspective_blockdev_snapshot(params):
213
    """Create a snapshot device.
214

    
215
    Note that this is only valid for LVM disks, if we get passed
216
    something else we raise an exception. The snapshot device can be
217
    remove by calling the generic block device remove call.
218

    
219
    """
220
    cfbd = objects.Disk.FromDict(params[0])
221
    return backend.BlockdevSnapshot(cfbd)
222

    
223
  @staticmethod
224
  def perspective_blockdev_grow(params):
225
    """Grow a stack of devices.
226

    
227
    """
228
    cfbd = objects.Disk.FromDict(params[0])
229
    amount = params[1]
230
    return backend.BlockdevGrow(cfbd, amount)
231

    
232
  @staticmethod
233
  def perspective_blockdev_close(params):
234
    """Closes the given block devices.
235

    
236
    """
237
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
238
    return backend.BlockdevClose(params[0], disks)
239

    
240
  # blockdev/drbd specific methods ----------
241

    
242
  @staticmethod
243
  def perspective_drbd_disconnect_net(params):
244
    """Disconnects the network connection of drbd disks.
245

    
246
    Note that this is only valid for drbd disks, so the members of the
247
    disk list must all be drbd devices.
248

    
249
    """
250
    nodes_ip, disks = params
251
    disks = [objects.Disk.FromDict(cf) for cf in disks]
252
    return backend.DrbdDisconnectNet(nodes_ip, disks)
253

    
254
  @staticmethod
255
  def perspective_drbd_attach_net(params):
256
    """Attaches the network connection of drbd disks.
257

    
258
    Note that this is only valid for drbd disks, so the members of the
259
    disk list must all be drbd devices.
260

    
261
    """
262
    nodes_ip, disks, instance_name, multimaster = params
263
    disks = [objects.Disk.FromDict(cf) for cf in disks]
264
    return backend.DrbdAttachNet(nodes_ip, disks,
265
                                     instance_name, multimaster)
266

    
267
  @staticmethod
268
  def perspective_drbd_wait_sync(params):
269
    """Wait until DRBD disks are synched.
270

    
271
    Note that this is only valid for drbd disks, so the members of the
272
    disk list must all be drbd devices.
273

    
274
    """
275
    nodes_ip, disks = params
276
    disks = [objects.Disk.FromDict(cf) for cf in disks]
277
    return backend.DrbdWaitSync(nodes_ip, disks)
278

    
279
  # export/import  --------------------------
280

    
281
  @staticmethod
282
  def perspective_snapshot_export(params):
283
    """Export a given snapshot.
284

    
285
    """
286
    disk = objects.Disk.FromDict(params[0])
287
    dest_node = params[1]
288
    instance = objects.Instance.FromDict(params[2])
289
    cluster_name = params[3]
290
    dev_idx = params[4]
291
    return backend.ExportSnapshot(disk, dest_node, instance,
292
                                  cluster_name, dev_idx)
293

    
294
  @staticmethod
295
  def perspective_finalize_export(params):
296
    """Expose the finalize export functionality.
297

    
298
    """
299
    instance = objects.Instance.FromDict(params[0])
300
    snap_disks = [objects.Disk.FromDict(str_data)
301
                  for str_data in params[1]]
302
    return backend.FinalizeExport(instance, snap_disks)
303

    
304
  @staticmethod
305
  def perspective_export_info(params):
306
    """Query information about an existing export on this node.
307

    
308
    The given path may not contain an export, in which case we return
309
    None.
310

    
311
    """
312
    path = params[0]
313
    return backend.ExportInfo(path)
314

    
315
  @staticmethod
316
  def perspective_export_list(params):
317
    """List the available exports on this node.
318

    
319
    Note that as opposed to export_info, which may query data about an
320
    export in any path, this only queries the standard Ganeti path
321
    (constants.EXPORT_DIR).
322

    
323
    """
324
    return backend.ListExports()
325

    
326
  @staticmethod
327
  def perspective_export_remove(params):
328
    """Remove an export.
329

    
330
    """
331
    export = params[0]
332
    return backend.RemoveExport(export)
333

    
334
  # volume  --------------------------
335

    
336
  @staticmethod
337
  def perspective_volume_list(params):
338
    """Query the list of logical volumes in a given volume group.
339

    
340
    """
341
    vgname = params[0]
342
    return True, backend.GetVolumeList(vgname)
343

    
344
  @staticmethod
345
  def perspective_vg_list(params):
346
    """Query the list of volume groups.
347

    
348
    """
349
    return backend.ListVolumeGroups()
350

    
351
  # bridge  --------------------------
352

    
353
  @staticmethod
354
  def perspective_bridges_exist(params):
355
    """Check if all bridges given exist on this node.
356

    
357
    """
358
    bridges_list = params[0]
359
    return backend.BridgesExist(bridges_list)
360

    
361
  # instance  --------------------------
362

    
363
  @staticmethod
364
  def perspective_instance_os_add(params):
365
    """Install an OS on a given instance.
366

    
367
    """
368
    inst_s = params[0]
369
    inst = objects.Instance.FromDict(inst_s)
370
    reinstall = params[1]
371
    return backend.InstanceOsAdd(inst, reinstall)
372

    
373
  @staticmethod
374
  def perspective_instance_run_rename(params):
375
    """Runs the OS rename script for an instance.
376

    
377
    """
378
    inst_s, old_name = params
379
    inst = objects.Instance.FromDict(inst_s)
380
    return backend.RunRenameInstance(inst, old_name)
381

    
382
  @staticmethod
383
  def perspective_instance_os_import(params):
384
    """Run the import function of an OS onto a given instance.
385

    
386
    """
387
    inst_s, src_node, src_images, cluster_name = params
388
    inst = objects.Instance.FromDict(inst_s)
389
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
390
                                        cluster_name)
391

    
392
  @staticmethod
393
  def perspective_instance_shutdown(params):
394
    """Shutdown an instance.
395

    
396
    """
397
    instance = objects.Instance.FromDict(params[0])
398
    return backend.InstanceShutdown(instance)
399

    
400
  @staticmethod
401
  def perspective_instance_start(params):
402
    """Start an instance.
403

    
404
    """
405
    instance = objects.Instance.FromDict(params[0])
406
    return backend.StartInstance(instance)
407

    
408
  @staticmethod
409
  def perspective_migration_info(params):
410
    """Gather information about an instance to be migrated.
411

    
412
    """
413
    instance = objects.Instance.FromDict(params[0])
414
    return backend.MigrationInfo(instance)
415

    
416
  @staticmethod
417
  def perspective_accept_instance(params):
418
    """Prepare the node to accept an instance.
419

    
420
    """
421
    instance, info, target = params
422
    instance = objects.Instance.FromDict(instance)
423
    return backend.AcceptInstance(instance, info, target)
424

    
425
  @staticmethod
426
  def perspective_finalize_migration(params):
427
    """Finalize the instance migration.
428

    
429
    """
430
    instance, info, success = params
431
    instance = objects.Instance.FromDict(instance)
432
    return backend.FinalizeMigration(instance, info, success)
433

    
434
  @staticmethod
435
  def perspective_instance_migrate(params):
436
    """Migrates an instance.
437

    
438
    """
439
    instance, target, live = params
440
    instance = objects.Instance.FromDict(instance)
441
    return backend.MigrateInstance(instance, target, live)
442

    
443
  @staticmethod
444
  def perspective_instance_reboot(params):
445
    """Reboot an instance.
446

    
447
    """
448
    instance = objects.Instance.FromDict(params[0])
449
    reboot_type = params[1]
450
    return backend.InstanceReboot(instance, reboot_type)
451

    
452
  @staticmethod
453
  def perspective_instance_info(params):
454
    """Query instance information.
455

    
456
    """
457
    return backend.GetInstanceInfo(params[0], params[1])
458

    
459
  @staticmethod
460
  def perspective_instance_migratable(params):
461
    """Query whether the specified instance can be migrated.
462

    
463
    """
464
    instance = objects.Instance.FromDict(params[0])
465
    return backend.GetInstanceMigratable(instance)
466

    
467
  @staticmethod
468
  def perspective_all_instances_info(params):
469
    """Query information about all instances.
470

    
471
    """
472
    return backend.GetAllInstancesInfo(params[0])
473

    
474
  @staticmethod
475
  def perspective_instance_list(params):
476
    """Query the list of running instances.
477

    
478
    """
479
    return True, backend.GetInstanceList(params[0])
480

    
481
  # node --------------------------
482

    
483
  @staticmethod
484
  def perspective_node_tcp_ping(params):
485
    """Do a TcpPing on the remote node.
486

    
487
    """
488
    return utils.TcpPing(params[1], params[2], timeout=params[3],
489
                         live_port_needed=params[4], source=params[0])
490

    
491
  @staticmethod
492
  def perspective_node_has_ip_address(params):
493
    """Checks if a node has the given ip address.
494

    
495
    """
496
    return True, utils.OwnIpAddress(params[0])
497

    
498
  @staticmethod
499
  def perspective_node_info(params):
500
    """Query node information.
501

    
502
    """
503
    vgname, hypervisor_type = params
504
    return backend.GetNodeInfo(vgname, hypervisor_type)
505

    
506
  @staticmethod
507
  def perspective_node_add(params):
508
    """Complete the registration of this node in the cluster.
509

    
510
    """
511
    return backend.AddNode(params[0], params[1], params[2],
512
                           params[3], params[4], params[5])
513

    
514
  @staticmethod
515
  def perspective_node_verify(params):
516
    """Run a verify sequence on this node.
517

    
518
    """
519
    return backend.VerifyNode(params[0], params[1])
520

    
521
  @staticmethod
522
  def perspective_node_start_master(params):
523
    """Promote this node to master status.
524

    
525
    """
526
    return backend.StartMaster(params[0])
527

    
528
  @staticmethod
529
  def perspective_node_stop_master(params):
530
    """Demote this node from master status.
531

    
532
    """
533
    return backend.StopMaster(params[0])
534

    
535
  @staticmethod
536
  def perspective_node_leave_cluster(params):
537
    """Cleanup after leaving a cluster.
538

    
539
    """
540
    return backend.LeaveCluster()
541

    
542
  @staticmethod
543
  def perspective_node_volumes(params):
544
    """Query the list of all logical volume groups.
545

    
546
    """
547
    return backend.NodeVolumes()
548

    
549
  @staticmethod
550
  def perspective_node_demote_from_mc(params):
551
    """Demote a node from the master candidate role.
552

    
553
    """
554
    return backend.DemoteFromMC()
555

    
556

    
557
  @staticmethod
558
  def perspective_node_powercycle(params):
559
    """Tries to powercycle the nod.
560

    
561
    """
562
    hypervisor_type = params[0]
563
    return backend.PowercycleNode(hypervisor_type)
564

    
565

    
566
  # cluster --------------------------
567

    
568
  @staticmethod
569
  def perspective_version(params):
570
    """Query version information.
571

    
572
    """
573
    return constants.PROTOCOL_VERSION
574

    
575
  @staticmethod
576
  def perspective_upload_file(params):
577
    """Upload a file.
578

    
579
    Note that the backend implementation imposes strict rules on which
580
    files are accepted.
581

    
582
    """
583
    return backend.UploadFile(*params)
584

    
585
  @staticmethod
586
  def perspective_master_info(params):
587
    """Query master information.
588

    
589
    """
590
    return backend.GetMasterInfo()
591

    
592
  @staticmethod
593
  def perspective_write_ssconf_files(params):
594
    """Write ssconf files.
595

    
596
    """
597
    (values,) = params
598
    return backend.WriteSsconfFiles(values)
599

    
600
  # os -----------------------
601

    
602
  @staticmethod
603
  def perspective_os_diagnose(params):
604
    """Query detailed information about existing OSes.
605

    
606
    """
607
    return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
608

    
609
  @staticmethod
610
  def perspective_os_get(params):
611
    """Query information about a given OS.
612

    
613
    """
614
    name = params[0]
615
    try:
616
      os_obj = backend.OSFromDisk(name)
617
    except errors.InvalidOS, err:
618
      os_obj = objects.OS.FromInvalidOS(err)
619
    return os_obj.ToDict()
620

    
621
  # hooks -----------------------
622

    
623
  @staticmethod
624
  def perspective_hooks_runner(params):
625
    """Run hook scripts.
626

    
627
    """
628
    hpath, phase, env = params
629
    hr = backend.HooksRunner()
630
    return hr.RunHooks(hpath, phase, env)
631

    
632
  # iallocator -----------------
633

    
634
  @staticmethod
635
  def perspective_iallocator_runner(params):
636
    """Run an iallocator script.
637

    
638
    """
639
    name, idata = params
640
    iar = backend.IAllocatorRunner()
641
    return iar.Run(name, idata)
642

    
643
  # test -----------------------
644

    
645
  @staticmethod
646
  def perspective_test_delay(params):
647
    """Run test delay.
648

    
649
    """
650
    duration = params[0]
651
    return utils.TestDelay(duration)
652

    
653
  # file storage ---------------
654

    
655
  @staticmethod
656
  def perspective_file_storage_dir_create(params):
657
    """Create the file storage directory.
658

    
659
    """
660
    file_storage_dir = params[0]
661
    return backend.CreateFileStorageDir(file_storage_dir)
662

    
663
  @staticmethod
664
  def perspective_file_storage_dir_remove(params):
665
    """Remove the file storage directory.
666

    
667
    """
668
    file_storage_dir = params[0]
669
    return backend.RemoveFileStorageDir(file_storage_dir)
670

    
671
  @staticmethod
672
  def perspective_file_storage_dir_rename(params):
673
    """Rename the file storage directory.
674

    
675
    """
676
    old_file_storage_dir = params[0]
677
    new_file_storage_dir = params[1]
678
    return backend.RenameFileStorageDir(old_file_storage_dir,
679
                                        new_file_storage_dir)
680

    
681
  # jobs ------------------------
682

    
683
  @staticmethod
684
  @_RequireJobQueueLock
685
  def perspective_jobqueue_update(params):
686
    """Update job queue.
687

    
688
    """
689
    (file_name, content) = params
690
    return backend.JobQueueUpdate(file_name, content)
691

    
692
  @staticmethod
693
  @_RequireJobQueueLock
694
  def perspective_jobqueue_purge(params):
695
    """Purge job queue.
696

    
697
    """
698
    return backend.JobQueuePurge()
699

    
700
  @staticmethod
701
  @_RequireJobQueueLock
702
  def perspective_jobqueue_rename(params):
703
    """Rename a job queue file.
704

    
705
    """
706
    # TODO: What if a file fails to rename?
707
    return [backend.JobQueueRename(old, new) for old, new in params]
708

    
709
  @staticmethod
710
  def perspective_jobqueue_set_drain(params):
711
    """Set/unset the queue drain flag.
712

    
713
    """
714
    drain_flag = params[0]
715
    return backend.JobQueueSetDrainFlag(drain_flag)
716

    
717

    
718
  # hypervisor ---------------
719

    
720
  @staticmethod
721
  def perspective_hypervisor_validate_params(params):
722
    """Validate the hypervisor parameters.
723

    
724
    """
725
    (hvname, hvparams) = params
726
    return backend.ValidateHVParams(hvname, hvparams)
727

    
728

    
729
def ParseOptions():
730
  """Parse the command line options.
731

    
732
  @return: (options, args) as from OptionParser.parse_args()
733

    
734
  """
735
  parser = OptionParser(description="Ganeti node daemon",
736
                        usage="%prog [-f] [-d] [-b ADDRESS]",
737
                        version="%%prog (ganeti) %s" %
738
                        constants.RELEASE_VERSION)
739

    
740
  parser.add_option("-f", "--foreground", dest="fork",
741
                    help="Don't detach from the current terminal",
742
                    default=True, action="store_false")
743
  parser.add_option("-d", "--debug", dest="debug",
744
                    help="Enable some debug messages",
745
                    default=False, action="store_true")
746
  parser.add_option("-b", "--bind", dest="bind_address",
747
                    help="Bind address",
748
                    default="", metavar="ADDRESS")
749

    
750
  options, args = parser.parse_args()
751
  return options, args
752

    
753

    
754
def main():
755
  """Main function for the node daemon.
756

    
757
  """
758
  global queue_lock
759

    
760
  options, args = ParseOptions()
761
  utils.debug = options.debug
762

    
763
  if options.fork:
764
    utils.CloseFDs()
765

    
766
  for fname in (constants.SSL_CERT_FILE,):
767
    if not os.path.isfile(fname):
768
      print "config %s not there, will not run." % fname
769
      sys.exit(5)
770

    
771
  try:
772
    port = utils.GetNodeDaemonPort()
773
  except errors.ConfigurationError, err:
774
    print "Cluster configuration incomplete: '%s'" % str(err)
775
    sys.exit(5)
776

    
777
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
778
  dirs.append((constants.LOG_OS_DIR, 0750))
779
  dirs.append((constants.LOCK_DIR, 1777))
780
  utils.EnsureDirs(dirs)
781

    
782
  # become a daemon
783
  if options.fork:
784
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
785

    
786
  utils.WritePidFile(constants.NODED_PID)
787
  try:
788
    utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
789
                       stderr_logging=not options.fork)
790
    logging.info("ganeti node daemon startup")
791

    
792
    # Read SSL certificate
793
    ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
794
                                    ssl_cert_path=constants.SSL_CERT_FILE)
795

    
796
    # Prepare job queue
797
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
798

    
799
    mainloop = daemon.Mainloop()
800
    server = NodeHttpServer(mainloop, options.bind_address, port,
801
                            ssl_params=ssl_params, ssl_verify_peer=True)
802
    server.Start()
803
    try:
804
      mainloop.Run()
805
    finally:
806
      server.Stop()
807
  finally:
808
    utils.RemovePidFile(constants.NODED_PID)
809

    
810

    
811
if __name__ == '__main__':
812
  main()