Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ f93427cd

History | View | Annotate | Download (21.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import SocketServer
35
import logging
36
import signal
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti import storage
49

    
50
import ganeti.http.server
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _RequireJobQueueLock(fn):
57
  """Decorator for job queue manipulating functions.
58

    
59
  """
60
  QUEUE_LOCK_TIMEOUT = 10
61

    
62
  def wrapper(*args, **kwargs):
63
    # Locking in exclusive, blocking mode because there could be several
64
    # children running at the same time. Waiting up to 10 seconds.
65
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
66
    try:
67
      return fn(*args, **kwargs)
68
    finally:
69
      queue_lock.Unlock()
70

    
71
  return wrapper
72

    
73

    
74
class NodeHttpServer(http.server.HttpServer): # pylint: disable-msg=R0904
75
  """The server implementation.
76

    
77
  This class holds all methods exposed over the RPC interface.
78

    
79
  """
80
  def __init__(self, *args, **kwargs):
81
    http.server.HttpServer.__init__(self, *args, **kwargs)
82
    self.noded_pid = os.getpid()
83

    
84
  def HandleRequest(self, req):
85
    """Handle a request.
86

    
87
    """
88
    if req.request_method.upper() != http.HTTP_PUT:
89
      raise http.HttpBadRequest()
90

    
91
    path = req.request_path
92
    if path.startswith("/"):
93
      path = path[1:]
94

    
95
    method = getattr(self, "perspective_%s" % path, None)
96
    if method is None:
97
      raise http.HttpNotFound()
98

    
99
    try:
100
      rvalue = method(req.request_body)
101
      return True, rvalue
102

    
103
    except backend.RPCFail, err:
104
      # our custom failure exception; str(err) works fine if the
105
      # exception was constructed with a single argument, and in
106
      # this case, err.message == err.args[0] == str(err)
107
      return (False, str(err))
108
    except errors.QuitGanetiException, err:
109
      # Tell parent to quit
110
      logging.info("Shutting down the node daemon, arguments: %s",
111
                   str(err.args))
112
      os.kill(self.noded_pid, signal.SIGTERM)
113
      # And return the error's arguments, which must be already in
114
      # correct tuple format
115
      return err.args
116
    except Exception, err:
117
      logging.exception("Error in RPC call")
118
      return False, "Error while executing backend function: %s" % str(err)
119

    
120
  # the new block devices  --------------------------
121

    
122
  @staticmethod
123
  def perspective_blockdev_create(params):
124
    """Create a block device.
125

    
126
    """
127
    bdev_s, size, owner, on_primary, info = params
128
    bdev = objects.Disk.FromDict(bdev_s)
129
    if bdev is None:
130
      raise ValueError("can't unserialize data!")
131
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
132

    
133
  @staticmethod
134
  def perspective_blockdev_remove(params):
135
    """Remove a block device.
136

    
137
    """
138
    bdev_s = params[0]
139
    bdev = objects.Disk.FromDict(bdev_s)
140
    return backend.BlockdevRemove(bdev)
141

    
142
  @staticmethod
143
  def perspective_blockdev_rename(params):
144
    """Remove a block device.
145

    
146
    """
147
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
148
    return backend.BlockdevRename(devlist)
149

    
150
  @staticmethod
151
  def perspective_blockdev_assemble(params):
152
    """Assemble a block device.
153

    
154
    """
155
    bdev_s, owner, on_primary = params
156
    bdev = objects.Disk.FromDict(bdev_s)
157
    if bdev is None:
158
      raise ValueError("can't unserialize data!")
159
    return backend.BlockdevAssemble(bdev, owner, on_primary)
160

    
161
  @staticmethod
162
  def perspective_blockdev_shutdown(params):
163
    """Shutdown a block device.
164

    
165
    """
166
    bdev_s = params[0]
167
    bdev = objects.Disk.FromDict(bdev_s)
168
    if bdev is None:
169
      raise ValueError("can't unserialize data!")
170
    return backend.BlockdevShutdown(bdev)
171

    
172
  @staticmethod
173
  def perspective_blockdev_addchildren(params):
174
    """Add a child to a mirror device.
175

    
176
    Note: this is only valid for mirror devices. It's the caller's duty
177
    to send a correct disk, otherwise we raise an error.
178

    
179
    """
180
    bdev_s, ndev_s = params
181
    bdev = objects.Disk.FromDict(bdev_s)
182
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183
    if bdev is None or ndevs.count(None) > 0:
184
      raise ValueError("can't unserialize data!")
185
    return backend.BlockdevAddchildren(bdev, ndevs)
186

    
187
  @staticmethod
188
  def perspective_blockdev_removechildren(params):
189
    """Remove a child from a mirror device.
190

    
191
    This is only valid for mirror devices, of course. It's the callers
192
    duty to send a correct disk, otherwise we raise an error.
193

    
194
    """
195
    bdev_s, ndev_s = params
196
    bdev = objects.Disk.FromDict(bdev_s)
197
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
198
    if bdev is None or ndevs.count(None) > 0:
199
      raise ValueError("can't unserialize data!")
200
    return backend.BlockdevRemovechildren(bdev, ndevs)
201

    
202
  @staticmethod
203
  def perspective_blockdev_getmirrorstatus(params):
204
    """Return the mirror status for a list of disks.
205

    
206
    """
207
    disks = [objects.Disk.FromDict(dsk_s)
208
             for dsk_s in params]
209
    return [status.ToDict()
210
            for status in backend.BlockdevGetmirrorstatus(disks)]
211

    
212
  @staticmethod
213
  def perspective_blockdev_find(params):
214
    """Expose the FindBlockDevice functionality for a disk.
215

    
216
    This will try to find but not activate a disk.
217

    
218
    """
219
    disk = objects.Disk.FromDict(params[0])
220

    
221
    result = backend.BlockdevFind(disk)
222
    if result is None:
223
      return None
224

    
225
    return result.ToDict()
226

    
227
  @staticmethod
228
  def perspective_blockdev_snapshot(params):
229
    """Create a snapshot device.
230

    
231
    Note that this is only valid for LVM disks, if we get passed
232
    something else we raise an exception. The snapshot device can be
233
    remove by calling the generic block device remove call.
234

    
235
    """
236
    cfbd = objects.Disk.FromDict(params[0])
237
    return backend.BlockdevSnapshot(cfbd)
238

    
239
  @staticmethod
240
  def perspective_blockdev_grow(params):
241
    """Grow a stack of devices.
242

    
243
    """
244
    cfbd = objects.Disk.FromDict(params[0])
245
    amount = params[1]
246
    return backend.BlockdevGrow(cfbd, amount)
247

    
248
  @staticmethod
249
  def perspective_blockdev_close(params):
250
    """Closes the given block devices.
251

    
252
    """
253
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
254
    return backend.BlockdevClose(params[0], disks)
255

    
256
  @staticmethod
257
  def perspective_blockdev_getsize(params):
258
    """Compute the sizes of the given block devices.
259

    
260
    """
261
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
262
    return backend.BlockdevGetsize(disks)
263

    
264
  @staticmethod
265
  def perspective_blockdev_export(params):
266
    """Compute the sizes of the given block devices.
267

    
268
    """
269
    disk = objects.Disk.FromDict(params[0])
270
    dest_node, dest_path, cluster_name = params[1:]
271
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
272

    
273
  # blockdev/drbd specific methods ----------
274

    
275
  @staticmethod
276
  def perspective_drbd_disconnect_net(params):
277
    """Disconnects the network connection of drbd disks.
278

    
279
    Note that this is only valid for drbd disks, so the members of the
280
    disk list must all be drbd devices.
281

    
282
    """
283
    nodes_ip, disks = params
284
    disks = [objects.Disk.FromDict(cf) for cf in disks]
285
    return backend.DrbdDisconnectNet(nodes_ip, disks)
286

    
287
  @staticmethod
288
  def perspective_drbd_attach_net(params):
289
    """Attaches the network connection of drbd disks.
290

    
291
    Note that this is only valid for drbd disks, so the members of the
292
    disk list must all be drbd devices.
293

    
294
    """
295
    nodes_ip, disks, instance_name, multimaster = params
296
    disks = [objects.Disk.FromDict(cf) for cf in disks]
297
    return backend.DrbdAttachNet(nodes_ip, disks,
298
                                     instance_name, multimaster)
299

    
300
  @staticmethod
301
  def perspective_drbd_wait_sync(params):
302
    """Wait until DRBD disks are synched.
303

    
304
    Note that this is only valid for drbd disks, so the members of the
305
    disk list must all be drbd devices.
306

    
307
    """
308
    nodes_ip, disks = params
309
    disks = [objects.Disk.FromDict(cf) for cf in disks]
310
    return backend.DrbdWaitSync(nodes_ip, disks)
311

    
312
  # export/import  --------------------------
313

    
314
  @staticmethod
315
  def perspective_snapshot_export(params):
316
    """Export a given snapshot.
317

    
318
    """
319
    disk = objects.Disk.FromDict(params[0])
320
    dest_node = params[1]
321
    instance = objects.Instance.FromDict(params[2])
322
    cluster_name = params[3]
323
    dev_idx = params[4]
324
    return backend.ExportSnapshot(disk, dest_node, instance,
325
                                  cluster_name, dev_idx)
326

    
327
  @staticmethod
328
  def perspective_finalize_export(params):
329
    """Expose the finalize export functionality.
330

    
331
    """
332
    instance = objects.Instance.FromDict(params[0])
333
    snap_disks = [objects.Disk.FromDict(str_data)
334
                  for str_data in params[1]]
335
    return backend.FinalizeExport(instance, snap_disks)
336

    
337
  @staticmethod
338
  def perspective_export_info(params):
339
    """Query information about an existing export on this node.
340

    
341
    The given path may not contain an export, in which case we return
342
    None.
343

    
344
    """
345
    path = params[0]
346
    return backend.ExportInfo(path)
347

    
348
  @staticmethod
349
  def perspective_export_list(params):
350
    """List the available exports on this node.
351

    
352
    Note that as opposed to export_info, which may query data about an
353
    export in any path, this only queries the standard Ganeti path
354
    (constants.EXPORT_DIR).
355

    
356
    """
357
    return backend.ListExports()
358

    
359
  @staticmethod
360
  def perspective_export_remove(params):
361
    """Remove an export.
362

    
363
    """
364
    export = params[0]
365
    return backend.RemoveExport(export)
366

    
367
  # volume  --------------------------
368

    
369
  @staticmethod
370
  def perspective_lv_list(params):
371
    """Query the list of logical volumes in a given volume group.
372

    
373
    """
374
    vgname = params[0]
375
    return backend.GetVolumeList(vgname)
376

    
377
  @staticmethod
378
  def perspective_vg_list(params):
379
    """Query the list of volume groups.
380

    
381
    """
382
    return backend.ListVolumeGroups()
383

    
384
  # Storage --------------------------
385

    
386
  @staticmethod
387
  def perspective_storage_list(params):
388
    """Get list of storage units.
389

    
390
    """
391
    (su_name, su_args, name, fields) = params
392
    return storage.GetStorage(su_name, *su_args).List(name, fields)
393

    
394
  @staticmethod
395
  def perspective_storage_modify(params):
396
    """Modify a storage unit.
397

    
398
    """
399
    (su_name, su_args, name, changes) = params
400
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
401

    
402
  @staticmethod
403
  def perspective_storage_execute(params):
404
    """Execute an operation on a storage unit.
405

    
406
    """
407
    (su_name, su_args, name, op) = params
408
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
409

    
410
  # bridge  --------------------------
411

    
412
  @staticmethod
413
  def perspective_bridges_exist(params):
414
    """Check if all bridges given exist on this node.
415

    
416
    """
417
    bridges_list = params[0]
418
    return backend.BridgesExist(bridges_list)
419

    
420
  # instance  --------------------------
421

    
422
  @staticmethod
423
  def perspective_instance_os_add(params):
424
    """Install an OS on a given instance.
425

    
426
    """
427
    inst_s = params[0]
428
    inst = objects.Instance.FromDict(inst_s)
429
    reinstall = params[1]
430
    return backend.InstanceOsAdd(inst, reinstall)
431

    
432
  @staticmethod
433
  def perspective_instance_run_rename(params):
434
    """Runs the OS rename script for an instance.
435

    
436
    """
437
    inst_s, old_name = params
438
    inst = objects.Instance.FromDict(inst_s)
439
    return backend.RunRenameInstance(inst, old_name)
440

    
441
  @staticmethod
442
  def perspective_instance_os_import(params):
443
    """Run the import function of an OS onto a given instance.
444

    
445
    """
446
    inst_s, src_node, src_images, cluster_name = params
447
    inst = objects.Instance.FromDict(inst_s)
448
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
449
                                        cluster_name)
450

    
451
  @staticmethod
452
  def perspective_instance_shutdown(params):
453
    """Shutdown an instance.
454

    
455
    """
456
    instance = objects.Instance.FromDict(params[0])
457
    timeout = params[1]
458
    return backend.InstanceShutdown(instance, timeout)
459

    
460
  @staticmethod
461
  def perspective_instance_start(params):
462
    """Start an instance.
463

    
464
    """
465
    instance = objects.Instance.FromDict(params[0])
466
    return backend.StartInstance(instance)
467

    
468
  @staticmethod
469
  def perspective_migration_info(params):
470
    """Gather information about an instance to be migrated.
471

    
472
    """
473
    instance = objects.Instance.FromDict(params[0])
474
    return backend.MigrationInfo(instance)
475

    
476
  @staticmethod
477
  def perspective_accept_instance(params):
478
    """Prepare the node to accept an instance.
479

    
480
    """
481
    instance, info, target = params
482
    instance = objects.Instance.FromDict(instance)
483
    return backend.AcceptInstance(instance, info, target)
484

    
485
  @staticmethod
486
  def perspective_finalize_migration(params):
487
    """Finalize the instance migration.
488

    
489
    """
490
    instance, info, success = params
491
    instance = objects.Instance.FromDict(instance)
492
    return backend.FinalizeMigration(instance, info, success)
493

    
494
  @staticmethod
495
  def perspective_instance_migrate(params):
496
    """Migrates an instance.
497

    
498
    """
499
    instance, target, live = params
500
    instance = objects.Instance.FromDict(instance)
501
    return backend.MigrateInstance(instance, target, live)
502

    
503
  @staticmethod
504
  def perspective_instance_reboot(params):
505
    """Reboot an instance.
506

    
507
    """
508
    instance = objects.Instance.FromDict(params[0])
509
    reboot_type = params[1]
510
    shutdown_timeout = params[2]
511
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
512

    
513
  @staticmethod
514
  def perspective_instance_info(params):
515
    """Query instance information.
516

    
517
    """
518
    return backend.GetInstanceInfo(params[0], params[1])
519

    
520
  @staticmethod
521
  def perspective_instance_migratable(params):
522
    """Query whether the specified instance can be migrated.
523

    
524
    """
525
    instance = objects.Instance.FromDict(params[0])
526
    return backend.GetInstanceMigratable(instance)
527

    
528
  @staticmethod
529
  def perspective_all_instances_info(params):
530
    """Query information about all instances.
531

    
532
    """
533
    return backend.GetAllInstancesInfo(params[0])
534

    
535
  @staticmethod
536
  def perspective_instance_list(params):
537
    """Query the list of running instances.
538

    
539
    """
540
    return backend.GetInstanceList(params[0])
541

    
542
  # node --------------------------
543

    
544
  @staticmethod
545
  def perspective_node_tcp_ping(params):
546
    """Do a TcpPing on the remote node.
547

    
548
    """
549
    return utils.TcpPing(params[1], params[2], timeout=params[3],
550
                         live_port_needed=params[4], source=params[0])
551

    
552
  @staticmethod
553
  def perspective_node_has_ip_address(params):
554
    """Checks if a node has the given ip address.
555

    
556
    """
557
    return utils.OwnIpAddress(params[0])
558

    
559
  @staticmethod
560
  def perspective_node_info(params):
561
    """Query node information.
562

    
563
    """
564
    vgname, hypervisor_type = params
565
    return backend.GetNodeInfo(vgname, hypervisor_type)
566

    
567
  @staticmethod
568
  def perspective_node_add(params):
569
    """Complete the registration of this node in the cluster.
570

    
571
    """
572
    return backend.AddNode(params[0], params[1], params[2],
573
                           params[3], params[4], params[5])
574

    
575
  @staticmethod
576
  def perspective_node_verify(params):
577
    """Run a verify sequence on this node.
578

    
579
    """
580
    return backend.VerifyNode(params[0], params[1])
581

    
582
  @staticmethod
583
  def perspective_node_start_master(params):
584
    """Promote this node to master status.
585

    
586
    """
587
    return backend.StartMaster(params[0], params[1])
588

    
589
  @staticmethod
590
  def perspective_node_stop_master(params):
591
    """Demote this node from master status.
592

    
593
    """
594
    return backend.StopMaster(params[0])
595

    
596
  @staticmethod
597
  def perspective_node_leave_cluster(params):
598
    """Cleanup after leaving a cluster.
599

    
600
    """
601
    return backend.LeaveCluster(params[0])
602

    
603
  @staticmethod
604
  def perspective_node_volumes(params):
605
    """Query the list of all logical volume groups.
606

    
607
    """
608
    return backend.NodeVolumes()
609

    
610
  @staticmethod
611
  def perspective_node_demote_from_mc(params):
612
    """Demote a node from the master candidate role.
613

    
614
    """
615
    return backend.DemoteFromMC()
616

    
617

    
618
  @staticmethod
619
  def perspective_node_powercycle(params):
620
    """Tries to powercycle the nod.
621

    
622
    """
623
    hypervisor_type = params[0]
624
    return backend.PowercycleNode(hypervisor_type)
625

    
626

    
627
  # cluster --------------------------
628

    
629
  @staticmethod
630
  def perspective_version(params):
631
    """Query version information.
632

    
633
    """
634
    return constants.PROTOCOL_VERSION
635

    
636
  @staticmethod
637
  def perspective_upload_file(params):
638
    """Upload a file.
639

    
640
    Note that the backend implementation imposes strict rules on which
641
    files are accepted.
642

    
643
    """
644
    return backend.UploadFile(*params)
645

    
646
  @staticmethod
647
  def perspective_master_info(params):
648
    """Query master information.
649

    
650
    """
651
    return backend.GetMasterInfo()
652

    
653
  @staticmethod
654
  def perspective_write_ssconf_files(params):
655
    """Write ssconf files.
656

    
657
    """
658
    (values,) = params
659
    return backend.WriteSsconfFiles(values)
660

    
661
  # os -----------------------
662

    
663
  @staticmethod
664
  def perspective_os_diagnose(params):
665
    """Query detailed information about existing OSes.
666

    
667
    """
668
    return backend.DiagnoseOS()
669

    
670
  @staticmethod
671
  def perspective_os_get(params):
672
    """Query information about a given OS.
673

    
674
    """
675
    name = params[0]
676
    os_obj = backend.OSFromDisk(name)
677
    return os_obj.ToDict()
678

    
679
  # hooks -----------------------
680

    
681
  @staticmethod
682
  def perspective_hooks_runner(params):
683
    """Run hook scripts.
684

    
685
    """
686
    hpath, phase, env = params
687
    hr = backend.HooksRunner()
688
    return hr.RunHooks(hpath, phase, env)
689

    
690
  # iallocator -----------------
691

    
692
  @staticmethod
693
  def perspective_iallocator_runner(params):
694
    """Run an iallocator script.
695

    
696
    """
697
    name, idata = params
698
    iar = backend.IAllocatorRunner()
699
    return iar.Run(name, idata)
700

    
701
  # test -----------------------
702

    
703
  @staticmethod
704
  def perspective_test_delay(params):
705
    """Run test delay.
706

    
707
    """
708
    duration = params[0]
709
    status, rval = utils.TestDelay(duration)
710
    if not status:
711
      raise backend.RPCFail(rval)
712
    return rval
713

    
714
  # file storage ---------------
715

    
716
  @staticmethod
717
  def perspective_file_storage_dir_create(params):
718
    """Create the file storage directory.
719

    
720
    """
721
    file_storage_dir = params[0]
722
    return backend.CreateFileStorageDir(file_storage_dir)
723

    
724
  @staticmethod
725
  def perspective_file_storage_dir_remove(params):
726
    """Remove the file storage directory.
727

    
728
    """
729
    file_storage_dir = params[0]
730
    return backend.RemoveFileStorageDir(file_storage_dir)
731

    
732
  @staticmethod
733
  def perspective_file_storage_dir_rename(params):
734
    """Rename the file storage directory.
735

    
736
    """
737
    old_file_storage_dir = params[0]
738
    new_file_storage_dir = params[1]
739
    return backend.RenameFileStorageDir(old_file_storage_dir,
740
                                        new_file_storage_dir)
741

    
742
  # jobs ------------------------
743

    
744
  @staticmethod
745
  @_RequireJobQueueLock
746
  def perspective_jobqueue_update(params):
747
    """Update job queue.
748

    
749
    """
750
    (file_name, content) = params
751
    return backend.JobQueueUpdate(file_name, content)
752

    
753
  @staticmethod
754
  @_RequireJobQueueLock
755
  def perspective_jobqueue_purge(params):
756
    """Purge job queue.
757

    
758
    """
759
    return backend.JobQueuePurge()
760

    
761
  @staticmethod
762
  @_RequireJobQueueLock
763
  def perspective_jobqueue_rename(params):
764
    """Rename a job queue file.
765

    
766
    """
767
    # TODO: What if a file fails to rename?
768
    return [backend.JobQueueRename(old, new) for old, new in params]
769

    
770
  @staticmethod
771
  def perspective_jobqueue_set_drain(params):
772
    """Set/unset the queue drain flag.
773

    
774
    """
775
    drain_flag = params[0]
776
    return backend.JobQueueSetDrainFlag(drain_flag)
777

    
778

    
779
  # hypervisor ---------------
780

    
781
  @staticmethod
782
  def perspective_hypervisor_validate_params(params):
783
    """Validate the hypervisor parameters.
784

    
785
    """
786
    (hvname, hvparams) = params
787
    return backend.ValidateHVParams(hvname, hvparams)
788

    
789

    
790
def CheckNoded(_, args):
791
  """Initial checks whether to run or exit with a failure.
792

    
793
  """
794
  if args: # noded doesn't take any arguments
795
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
796
                          sys.argv[0])
797
    sys.exit(constants.EXIT_FAILURE)
798

    
799

    
800
def ExecNoded(options, args):
801
  """Main node daemon function, executed with the PID file held.
802

    
803
  """
804
  global queue_lock # pylint: disable-msg=W0603
805

    
806
  # Read SSL certificate
807
  if options.ssl:
808
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
809
                                    ssl_cert_path=options.ssl_cert)
810
  else:
811
    ssl_params = None
812

    
813
  # Prepare job queue
814
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
815

    
816
  mainloop = daemon.Mainloop()
817
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
818
                          ssl_params=ssl_params, ssl_verify_peer=True)
819
  server.Start()
820
  try:
821
    mainloop.Run()
822
  finally:
823
    server.Stop()
824

    
825

    
826
def main():
827
  """Main function for the node daemon.
828

    
829
  """
830
  parser = OptionParser(description="Ganeti node daemon",
831
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
832
                        version="%%prog (ganeti) %s" %
833
                        constants.RELEASE_VERSION)
834
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
835
  dirs.append((constants.LOG_OS_DIR, 0750))
836
  dirs.append((constants.LOCK_DIR, 1777))
837
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
838

    
839

    
840
if __name__ == '__main__':
841
  main()