Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 30e4e741

History | View | Annotate | Download (21.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48

    
49
import ganeti.http.server # pylint: disable-msg=W0611
50

    
51

    
52
queue_lock = None
53

    
54

    
55
def _RequireJobQueueLock(fn):
56
  """Decorator for job queue manipulating functions.
57

    
58
  """
59
  QUEUE_LOCK_TIMEOUT = 10
60

    
61
  def wrapper(*args, **kwargs):
62
    # Locking in exclusive, blocking mode because there could be several
63
    # children running at the same time. Waiting up to 10 seconds.
64
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
65
    try:
66
      return fn(*args, **kwargs)
67
    finally:
68
      queue_lock.Unlock()
69

    
70
  return wrapper
71

    
72

    
73
class NodeHttpServer(http.server.HttpServer):
74
  """The server implementation.
75

    
76
  This class holds all methods exposed over the RPC interface.
77

    
78
  """
79
  # too many public methods, and unused args - all methods get params
80
  # due to the API
81
  # pylint: disable-msg=R0904,W0613
82
  def __init__(self, *args, **kwargs):
83
    http.server.HttpServer.__init__(self, *args, **kwargs)
84
    self.noded_pid = os.getpid()
85

    
86
  def HandleRequest(self, req):
87
    """Handle a request.
88

    
89
    """
90
    if req.request_method.upper() != http.HTTP_PUT:
91
      raise http.HttpBadRequest()
92

    
93
    path = req.request_path
94
    if path.startswith("/"):
95
      path = path[1:]
96

    
97
    method = getattr(self, "perspective_%s" % path, None)
98
    if method is None:
99
      raise http.HttpNotFound()
100

    
101
    try:
102
      rvalue = method(req.request_body)
103
      return True, rvalue
104

    
105
    except backend.RPCFail, err:
106
      # our custom failure exception; str(err) works fine if the
107
      # exception was constructed with a single argument, and in
108
      # this case, err.message == err.args[0] == str(err)
109
      return (False, str(err))
110
    except errors.QuitGanetiException, err:
111
      # Tell parent to quit
112
      logging.info("Shutting down the node daemon, arguments: %s",
113
                   str(err.args))
114
      os.kill(self.noded_pid, signal.SIGTERM)
115
      # And return the error's arguments, which must be already in
116
      # correct tuple format
117
      return err.args
118
    except Exception, err:
119
      logging.exception("Error in RPC call")
120
      return False, "Error while executing backend function: %s" % str(err)
121

    
122
  # the new block devices  --------------------------
123

    
124
  @staticmethod
125
  def perspective_blockdev_create(params):
126
    """Create a block device.
127

    
128
    """
129
    bdev_s, size, owner, on_primary, info = params
130
    bdev = objects.Disk.FromDict(bdev_s)
131
    if bdev is None:
132
      raise ValueError("can't unserialize data!")
133
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
134

    
135
  @staticmethod
136
  def perspective_blockdev_remove(params):
137
    """Remove a block device.
138

    
139
    """
140
    bdev_s = params[0]
141
    bdev = objects.Disk.FromDict(bdev_s)
142
    return backend.BlockdevRemove(bdev)
143

    
144
  @staticmethod
145
  def perspective_blockdev_rename(params):
146
    """Remove a block device.
147

    
148
    """
149
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
150
    return backend.BlockdevRename(devlist)
151

    
152
  @staticmethod
153
  def perspective_blockdev_assemble(params):
154
    """Assemble a block device.
155

    
156
    """
157
    bdev_s, owner, on_primary = params
158
    bdev = objects.Disk.FromDict(bdev_s)
159
    if bdev is None:
160
      raise ValueError("can't unserialize data!")
161
    return backend.BlockdevAssemble(bdev, owner, on_primary)
162

    
163
  @staticmethod
164
  def perspective_blockdev_shutdown(params):
165
    """Shutdown a block device.
166

    
167
    """
168
    bdev_s = params[0]
169
    bdev = objects.Disk.FromDict(bdev_s)
170
    if bdev is None:
171
      raise ValueError("can't unserialize data!")
172
    return backend.BlockdevShutdown(bdev)
173

    
174
  @staticmethod
175
  def perspective_blockdev_addchildren(params):
176
    """Add a child to a mirror device.
177

    
178
    Note: this is only valid for mirror devices. It's the caller's duty
179
    to send a correct disk, otherwise we raise an error.
180

    
181
    """
182
    bdev_s, ndev_s = params
183
    bdev = objects.Disk.FromDict(bdev_s)
184
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
185
    if bdev is None or ndevs.count(None) > 0:
186
      raise ValueError("can't unserialize data!")
187
    return backend.BlockdevAddchildren(bdev, ndevs)
188

    
189
  @staticmethod
190
  def perspective_blockdev_removechildren(params):
191
    """Remove a child from a mirror device.
192

    
193
    This is only valid for mirror devices, of course. It's the callers
194
    duty to send a correct disk, otherwise we raise an error.
195

    
196
    """
197
    bdev_s, ndev_s = params
198
    bdev = objects.Disk.FromDict(bdev_s)
199
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
200
    if bdev is None or ndevs.count(None) > 0:
201
      raise ValueError("can't unserialize data!")
202
    return backend.BlockdevRemovechildren(bdev, ndevs)
203

    
204
  @staticmethod
205
  def perspective_blockdev_getmirrorstatus(params):
206
    """Return the mirror status for a list of disks.
207

    
208
    """
209
    disks = [objects.Disk.FromDict(dsk_s)
210
             for dsk_s in params]
211
    return [status.ToDict()
212
            for status in backend.BlockdevGetmirrorstatus(disks)]
213

    
214
  @staticmethod
215
  def perspective_blockdev_find(params):
216
    """Expose the FindBlockDevice functionality for a disk.
217

    
218
    This will try to find but not activate a disk.
219

    
220
    """
221
    disk = objects.Disk.FromDict(params[0])
222

    
223
    result = backend.BlockdevFind(disk)
224
    if result is None:
225
      return None
226

    
227
    return result.ToDict()
228

    
229
  @staticmethod
230
  def perspective_blockdev_snapshot(params):
231
    """Create a snapshot device.
232

    
233
    Note that this is only valid for LVM disks, if we get passed
234
    something else we raise an exception. The snapshot device can be
235
    remove by calling the generic block device remove call.
236

    
237
    """
238
    cfbd = objects.Disk.FromDict(params[0])
239
    return backend.BlockdevSnapshot(cfbd)
240

    
241
  @staticmethod
242
  def perspective_blockdev_grow(params):
243
    """Grow a stack of devices.
244

    
245
    """
246
    cfbd = objects.Disk.FromDict(params[0])
247
    amount = params[1]
248
    return backend.BlockdevGrow(cfbd, amount)
249

    
250
  @staticmethod
251
  def perspective_blockdev_close(params):
252
    """Closes the given block devices.
253

    
254
    """
255
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
256
    return backend.BlockdevClose(params[0], disks)
257

    
258
  @staticmethod
259
  def perspective_blockdev_getsize(params):
260
    """Compute the sizes of the given block devices.
261

    
262
    """
263
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
264
    return backend.BlockdevGetsize(disks)
265

    
266
  @staticmethod
267
  def perspective_blockdev_export(params):
268
    """Compute the sizes of the given block devices.
269

    
270
    """
271
    disk = objects.Disk.FromDict(params[0])
272
    dest_node, dest_path, cluster_name = params[1:]
273
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
274

    
275
  # blockdev/drbd specific methods ----------
276

    
277
  @staticmethod
278
  def perspective_drbd_disconnect_net(params):
279
    """Disconnects the network connection of drbd disks.
280

    
281
    Note that this is only valid for drbd disks, so the members of the
282
    disk list must all be drbd devices.
283

    
284
    """
285
    nodes_ip, disks = params
286
    disks = [objects.Disk.FromDict(cf) for cf in disks]
287
    return backend.DrbdDisconnectNet(nodes_ip, disks)
288

    
289
  @staticmethod
290
  def perspective_drbd_attach_net(params):
291
    """Attaches the network connection of drbd disks.
292

    
293
    Note that this is only valid for drbd disks, so the members of the
294
    disk list must all be drbd devices.
295

    
296
    """
297
    nodes_ip, disks, instance_name, multimaster = params
298
    disks = [objects.Disk.FromDict(cf) for cf in disks]
299
    return backend.DrbdAttachNet(nodes_ip, disks,
300
                                     instance_name, multimaster)
301

    
302
  @staticmethod
303
  def perspective_drbd_wait_sync(params):
304
    """Wait until DRBD disks are synched.
305

    
306
    Note that this is only valid for drbd disks, so the members of the
307
    disk list must all be drbd devices.
308

    
309
    """
310
    nodes_ip, disks = params
311
    disks = [objects.Disk.FromDict(cf) for cf in disks]
312
    return backend.DrbdWaitSync(nodes_ip, disks)
313

    
314
  # export/import  --------------------------
315

    
316
  @staticmethod
317
  def perspective_snapshot_export(params):
318
    """Export a given snapshot.
319

    
320
    """
321
    disk = objects.Disk.FromDict(params[0])
322
    dest_node = params[1]
323
    instance = objects.Instance.FromDict(params[2])
324
    cluster_name = params[3]
325
    dev_idx = params[4]
326
    return backend.ExportSnapshot(disk, dest_node, instance,
327
                                  cluster_name, dev_idx)
328

    
329
  @staticmethod
330
  def perspective_finalize_export(params):
331
    """Expose the finalize export functionality.
332

    
333
    """
334
    instance = objects.Instance.FromDict(params[0])
335
    snap_disks = [objects.Disk.FromDict(str_data)
336
                  for str_data in params[1]]
337
    return backend.FinalizeExport(instance, snap_disks)
338

    
339
  @staticmethod
340
  def perspective_export_info(params):
341
    """Query information about an existing export on this node.
342

    
343
    The given path may not contain an export, in which case we return
344
    None.
345

    
346
    """
347
    path = params[0]
348
    return backend.ExportInfo(path)
349

    
350
  @staticmethod
351
  def perspective_export_list(params):
352
    """List the available exports on this node.
353

    
354
    Note that as opposed to export_info, which may query data about an
355
    export in any path, this only queries the standard Ganeti path
356
    (constants.EXPORT_DIR).
357

    
358
    """
359
    return backend.ListExports()
360

    
361
  @staticmethod
362
  def perspective_export_remove(params):
363
    """Remove an export.
364

    
365
    """
366
    export = params[0]
367
    return backend.RemoveExport(export)
368

    
369
  # volume  --------------------------
370

    
371
  @staticmethod
372
  def perspective_lv_list(params):
373
    """Query the list of logical volumes in a given volume group.
374

    
375
    """
376
    vgname = params[0]
377
    return backend.GetVolumeList(vgname)
378

    
379
  @staticmethod
380
  def perspective_vg_list(params):
381
    """Query the list of volume groups.
382

    
383
    """
384
    return backend.ListVolumeGroups()
385

    
386
  # Storage --------------------------
387

    
388
  @staticmethod
389
  def perspective_storage_list(params):
390
    """Get list of storage units.
391

    
392
    """
393
    (su_name, su_args, name, fields) = params
394
    return storage.GetStorage(su_name, *su_args).List(name, fields)
395

    
396
  @staticmethod
397
  def perspective_storage_modify(params):
398
    """Modify a storage unit.
399

    
400
    """
401
    (su_name, su_args, name, changes) = params
402
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
403

    
404
  @staticmethod
405
  def perspective_storage_execute(params):
406
    """Execute an operation on a storage unit.
407

    
408
    """
409
    (su_name, su_args, name, op) = params
410
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
411

    
412
  # bridge  --------------------------
413

    
414
  @staticmethod
415
  def perspective_bridges_exist(params):
416
    """Check if all bridges given exist on this node.
417

    
418
    """
419
    bridges_list = params[0]
420
    return backend.BridgesExist(bridges_list)
421

    
422
  # instance  --------------------------
423

    
424
  @staticmethod
425
  def perspective_instance_os_add(params):
426
    """Install an OS on a given instance.
427

    
428
    """
429
    inst_s = params[0]
430
    inst = objects.Instance.FromDict(inst_s)
431
    reinstall = params[1]
432
    return backend.InstanceOsAdd(inst, reinstall)
433

    
434
  @staticmethod
435
  def perspective_instance_run_rename(params):
436
    """Runs the OS rename script for an instance.
437

    
438
    """
439
    inst_s, old_name = params
440
    inst = objects.Instance.FromDict(inst_s)
441
    return backend.RunRenameInstance(inst, old_name)
442

    
443
  @staticmethod
444
  def perspective_instance_os_import(params):
445
    """Run the import function of an OS onto a given instance.
446

    
447
    """
448
    inst_s, src_node, src_images, cluster_name = params
449
    inst = objects.Instance.FromDict(inst_s)
450
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
451
                                        cluster_name)
452

    
453
  @staticmethod
454
  def perspective_instance_shutdown(params):
455
    """Shutdown an instance.
456

    
457
    """
458
    instance = objects.Instance.FromDict(params[0])
459
    timeout = params[1]
460
    return backend.InstanceShutdown(instance, timeout)
461

    
462
  @staticmethod
463
  def perspective_instance_start(params):
464
    """Start an instance.
465

    
466
    """
467
    instance = objects.Instance.FromDict(params[0])
468
    return backend.StartInstance(instance)
469

    
470
  @staticmethod
471
  def perspective_migration_info(params):
472
    """Gather information about an instance to be migrated.
473

    
474
    """
475
    instance = objects.Instance.FromDict(params[0])
476
    return backend.MigrationInfo(instance)
477

    
478
  @staticmethod
479
  def perspective_accept_instance(params):
480
    """Prepare the node to accept an instance.
481

    
482
    """
483
    instance, info, target = params
484
    instance = objects.Instance.FromDict(instance)
485
    return backend.AcceptInstance(instance, info, target)
486

    
487
  @staticmethod
488
  def perspective_finalize_migration(params):
489
    """Finalize the instance migration.
490

    
491
    """
492
    instance, info, success = params
493
    instance = objects.Instance.FromDict(instance)
494
    return backend.FinalizeMigration(instance, info, success)
495

    
496
  @staticmethod
497
  def perspective_instance_migrate(params):
498
    """Migrates an instance.
499

    
500
    """
501
    instance, target, live = params
502
    instance = objects.Instance.FromDict(instance)
503
    return backend.MigrateInstance(instance, target, live)
504

    
505
  @staticmethod
506
  def perspective_instance_reboot(params):
507
    """Reboot an instance.
508

    
509
    """
510
    instance = objects.Instance.FromDict(params[0])
511
    reboot_type = params[1]
512
    shutdown_timeout = params[2]
513
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
514

    
515
  @staticmethod
516
  def perspective_instance_info(params):
517
    """Query instance information.
518

    
519
    """
520
    return backend.GetInstanceInfo(params[0], params[1])
521

    
522
  @staticmethod
523
  def perspective_instance_migratable(params):
524
    """Query whether the specified instance can be migrated.
525

    
526
    """
527
    instance = objects.Instance.FromDict(params[0])
528
    return backend.GetInstanceMigratable(instance)
529

    
530
  @staticmethod
531
  def perspective_all_instances_info(params):
532
    """Query information about all instances.
533

    
534
    """
535
    return backend.GetAllInstancesInfo(params[0])
536

    
537
  @staticmethod
538
  def perspective_instance_list(params):
539
    """Query the list of running instances.
540

    
541
    """
542
    return backend.GetInstanceList(params[0])
543

    
544
  # node --------------------------
545

    
546
  @staticmethod
547
  def perspective_node_tcp_ping(params):
548
    """Do a TcpPing on the remote node.
549

    
550
    """
551
    return utils.TcpPing(params[1], params[2], timeout=params[3],
552
                         live_port_needed=params[4], source=params[0])
553

    
554
  @staticmethod
555
  def perspective_node_has_ip_address(params):
556
    """Checks if a node has the given ip address.
557

    
558
    """
559
    return utils.OwnIpAddress(params[0])
560

    
561
  @staticmethod
562
  def perspective_node_info(params):
563
    """Query node information.
564

    
565
    """
566
    vgname, hypervisor_type = params
567
    return backend.GetNodeInfo(vgname, hypervisor_type)
568

    
569
  @staticmethod
570
  def perspective_node_add(params):
571
    """Complete the registration of this node in the cluster.
572

    
573
    """
574
    return backend.AddNode(params[0], params[1], params[2],
575
                           params[3], params[4], params[5])
576

    
577
  @staticmethod
578
  def perspective_node_verify(params):
579
    """Run a verify sequence on this node.
580

    
581
    """
582
    return backend.VerifyNode(params[0], params[1])
583

    
584
  @staticmethod
585
  def perspective_node_start_master(params):
586
    """Promote this node to master status.
587

    
588
    """
589
    return backend.StartMaster(params[0], params[1])
590

    
591
  @staticmethod
592
  def perspective_node_stop_master(params):
593
    """Demote this node from master status.
594

    
595
    """
596
    return backend.StopMaster(params[0])
597

    
598
  @staticmethod
599
  def perspective_node_leave_cluster(params):
600
    """Cleanup after leaving a cluster.
601

    
602
    """
603
    return backend.LeaveCluster(params[0])
604

    
605
  @staticmethod
606
  def perspective_node_volumes(params):
607
    """Query the list of all logical volume groups.
608

    
609
    """
610
    return backend.NodeVolumes()
611

    
612
  @staticmethod
613
  def perspective_node_demote_from_mc(params):
614
    """Demote a node from the master candidate role.
615

    
616
    """
617
    return backend.DemoteFromMC()
618

    
619

    
620
  @staticmethod
621
  def perspective_node_powercycle(params):
622
    """Tries to powercycle the nod.
623

    
624
    """
625
    hypervisor_type = params[0]
626
    return backend.PowercycleNode(hypervisor_type)
627

    
628

    
629
  # cluster --------------------------
630

    
631
  @staticmethod
632
  def perspective_version(params):
633
    """Query version information.
634

    
635
    """
636
    return constants.PROTOCOL_VERSION
637

    
638
  @staticmethod
639
  def perspective_upload_file(params):
640
    """Upload a file.
641

    
642
    Note that the backend implementation imposes strict rules on which
643
    files are accepted.
644

    
645
    """
646
    return backend.UploadFile(*params)
647

    
648
  @staticmethod
649
  def perspective_master_info(params):
650
    """Query master information.
651

    
652
    """
653
    return backend.GetMasterInfo()
654

    
655
  @staticmethod
656
  def perspective_write_ssconf_files(params):
657
    """Write ssconf files.
658

    
659
    """
660
    (values,) = params
661
    return backend.WriteSsconfFiles(values)
662

    
663
  # os -----------------------
664

    
665
  @staticmethod
666
  def perspective_os_diagnose(params):
667
    """Query detailed information about existing OSes.
668

    
669
    """
670
    return backend.DiagnoseOS()
671

    
672
  @staticmethod
673
  def perspective_os_get(params):
674
    """Query information about a given OS.
675

    
676
    """
677
    name = params[0]
678
    os_obj = backend.OSFromDisk(name)
679
    return os_obj.ToDict()
680

    
681
  # hooks -----------------------
682

    
683
  @staticmethod
684
  def perspective_hooks_runner(params):
685
    """Run hook scripts.
686

    
687
    """
688
    hpath, phase, env = params
689
    hr = backend.HooksRunner()
690
    return hr.RunHooks(hpath, phase, env)
691

    
692
  # iallocator -----------------
693

    
694
  @staticmethod
695
  def perspective_iallocator_runner(params):
696
    """Run an iallocator script.
697

    
698
    """
699
    name, idata = params
700
    iar = backend.IAllocatorRunner()
701
    return iar.Run(name, idata)
702

    
703
  # test -----------------------
704

    
705
  @staticmethod
706
  def perspective_test_delay(params):
707
    """Run test delay.
708

    
709
    """
710
    duration = params[0]
711
    status, rval = utils.TestDelay(duration)
712
    if not status:
713
      raise backend.RPCFail(rval)
714
    return rval
715

    
716
  # file storage ---------------
717

    
718
  @staticmethod
719
  def perspective_file_storage_dir_create(params):
720
    """Create the file storage directory.
721

    
722
    """
723
    file_storage_dir = params[0]
724
    return backend.CreateFileStorageDir(file_storage_dir)
725

    
726
  @staticmethod
727
  def perspective_file_storage_dir_remove(params):
728
    """Remove the file storage directory.
729

    
730
    """
731
    file_storage_dir = params[0]
732
    return backend.RemoveFileStorageDir(file_storage_dir)
733

    
734
  @staticmethod
735
  def perspective_file_storage_dir_rename(params):
736
    """Rename the file storage directory.
737

    
738
    """
739
    old_file_storage_dir = params[0]
740
    new_file_storage_dir = params[1]
741
    return backend.RenameFileStorageDir(old_file_storage_dir,
742
                                        new_file_storage_dir)
743

    
744
  # jobs ------------------------
745

    
746
  @staticmethod
747
  @_RequireJobQueueLock
748
  def perspective_jobqueue_update(params):
749
    """Update job queue.
750

    
751
    """
752
    (file_name, content) = params
753
    return backend.JobQueueUpdate(file_name, content)
754

    
755
  @staticmethod
756
  @_RequireJobQueueLock
757
  def perspective_jobqueue_purge(params):
758
    """Purge job queue.
759

    
760
    """
761
    return backend.JobQueuePurge()
762

    
763
  @staticmethod
764
  @_RequireJobQueueLock
765
  def perspective_jobqueue_rename(params):
766
    """Rename a job queue file.
767

    
768
    """
769
    # TODO: What if a file fails to rename?
770
    return [backend.JobQueueRename(old, new) for old, new in params]
771

    
772
  @staticmethod
773
  def perspective_jobqueue_set_drain(params):
774
    """Set/unset the queue drain flag.
775

    
776
    """
777
    drain_flag = params[0]
778
    return backend.JobQueueSetDrainFlag(drain_flag)
779

    
780

    
781
  # hypervisor ---------------
782

    
783
  @staticmethod
784
  def perspective_hypervisor_validate_params(params):
785
    """Validate the hypervisor parameters.
786

    
787
    """
788
    (hvname, hvparams) = params
789
    return backend.ValidateHVParams(hvname, hvparams)
790

    
791

    
792
def CheckNoded(_, args):
793
  """Initial checks whether to run or exit with a failure.
794

    
795
  """
796
  if args: # noded doesn't take any arguments
797
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
798
                          sys.argv[0])
799
    sys.exit(constants.EXIT_FAILURE)
800

    
801

    
802
def ExecNoded(options, _):
803
  """Main node daemon function, executed with the PID file held.
804

    
805
  """
806
  global queue_lock # pylint: disable-msg=W0603
807

    
808
  # Read SSL certificate
809
  if options.ssl:
810
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
811
                                    ssl_cert_path=options.ssl_cert)
812
  else:
813
    ssl_params = None
814

    
815
  # Prepare job queue
816
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
817

    
818
  mainloop = daemon.Mainloop()
819
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
820
                          ssl_params=ssl_params, ssl_verify_peer=True)
821
  server.Start()
822
  try:
823
    mainloop.Run()
824
  finally:
825
    server.Stop()
826

    
827

    
828
def main():
829
  """Main function for the node daemon.
830

    
831
  """
832
  parser = OptionParser(description="Ganeti node daemon",
833
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
834
                        version="%%prog (ganeti) %s" %
835
                        constants.RELEASE_VERSION)
836
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
837
  dirs.append((constants.LOG_OS_DIR, 0750))
838
  dirs.append((constants.LOCK_DIR, 1777))
839
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
840

    
841

    
842
if __name__ == '__main__':
843
  main()