Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 4c32a8bd

History | View | Annotate | Download (23.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48

    
49
import ganeti.http.server # pylint: disable-msg=W0611
50

    
51

    
52
queue_lock = None
53

    
54

    
55
def _PrepareQueueLock():
56
  """Try to prepare the queue lock.
57

    
58
  @return: None for success, otherwise an exception object
59

    
60
  """
61
  global queue_lock # pylint: disable-msg=W0603
62

    
63
  if queue_lock is not None:
64
    return None
65

    
66
  # Prepare job queue
67
  try:
68
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
69
    return None
70
  except EnvironmentError, err:
71
    return err
72

    
73

    
74
def _RequireJobQueueLock(fn):
75
  """Decorator for job queue manipulating functions.
76

    
77
  """
78
  QUEUE_LOCK_TIMEOUT = 10
79

    
80
  def wrapper(*args, **kwargs):
81
    # Locking in exclusive, blocking mode because there could be several
82
    # children running at the same time. Waiting up to 10 seconds.
83
    if _PrepareQueueLock() is not None:
84
      raise errors.JobQueueError("Job queue failed initialization,"
85
                                 " cannot update jobs")
86
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
87
    try:
88
      return fn(*args, **kwargs)
89
    finally:
90
      queue_lock.Unlock()
91

    
92
  return wrapper
93

    
94

    
95
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
96
  """Custom Request Executor class that ensures NodeHttpServer children are
97
  locked in ram.
98

    
99
  """
100
  def __init__(self, *args, **kwargs):
101
    utils.Mlockall()
102

    
103
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
104

    
105

    
106
class NodeHttpServer(http.server.HttpServer):
107
  """The server implementation.
108

    
109
  This class holds all methods exposed over the RPC interface.
110

    
111
  """
112
  # too many public methods, and unused args - all methods get params
113
  # due to the API
114
  # pylint: disable-msg=R0904,W0613
115
  def __init__(self, *args, **kwargs):
116
    http.server.HttpServer.__init__(self, *args, **kwargs)
117
    self.noded_pid = os.getpid()
118

    
119
  def HandleRequest(self, req):
120
    """Handle a request.
121

    
122
    """
123
    if req.request_method.upper() != http.HTTP_PUT:
124
      raise http.HttpBadRequest()
125

    
126
    path = req.request_path
127
    if path.startswith("/"):
128
      path = path[1:]
129

    
130
    method = getattr(self, "perspective_%s" % path, None)
131
    if method is None:
132
      raise http.HttpNotFound()
133

    
134
    try:
135
      rvalue = method(req.request_body)
136
      return True, rvalue
137

    
138
    except backend.RPCFail, err:
139
      # our custom failure exception; str(err) works fine if the
140
      # exception was constructed with a single argument, and in
141
      # this case, err.message == err.args[0] == str(err)
142
      return (False, str(err))
143
    except errors.QuitGanetiException, err:
144
      # Tell parent to quit
145
      logging.info("Shutting down the node daemon, arguments: %s",
146
                   str(err.args))
147
      os.kill(self.noded_pid, signal.SIGTERM)
148
      # And return the error's arguments, which must be already in
149
      # correct tuple format
150
      return err.args
151
    except Exception, err:
152
      logging.exception("Error in RPC call")
153
      return False, "Error while executing backend function: %s" % str(err)
154

    
155
  # the new block devices  --------------------------
156

    
157
  @staticmethod
158
  def perspective_blockdev_create(params):
159
    """Create a block device.
160

    
161
    """
162
    bdev_s, size, owner, on_primary, info = params
163
    bdev = objects.Disk.FromDict(bdev_s)
164
    if bdev is None:
165
      raise ValueError("can't unserialize data!")
166
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
167

    
168
  @staticmethod
169
  def perspective_blockdev_remove(params):
170
    """Remove a block device.
171

    
172
    """
173
    bdev_s = params[0]
174
    bdev = objects.Disk.FromDict(bdev_s)
175
    return backend.BlockdevRemove(bdev)
176

    
177
  @staticmethod
178
  def perspective_blockdev_rename(params):
179
    """Remove a block device.
180

    
181
    """
182
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
183
    return backend.BlockdevRename(devlist)
184

    
185
  @staticmethod
186
  def perspective_blockdev_assemble(params):
187
    """Assemble a block device.
188

    
189
    """
190
    bdev_s, owner, on_primary = params
191
    bdev = objects.Disk.FromDict(bdev_s)
192
    if bdev is None:
193
      raise ValueError("can't unserialize data!")
194
    return backend.BlockdevAssemble(bdev, owner, on_primary)
195

    
196
  @staticmethod
197
  def perspective_blockdev_shutdown(params):
198
    """Shutdown a block device.
199

    
200
    """
201
    bdev_s = params[0]
202
    bdev = objects.Disk.FromDict(bdev_s)
203
    if bdev is None:
204
      raise ValueError("can't unserialize data!")
205
    return backend.BlockdevShutdown(bdev)
206

    
207
  @staticmethod
208
  def perspective_blockdev_addchildren(params):
209
    """Add a child to a mirror device.
210

    
211
    Note: this is only valid for mirror devices. It's the caller's duty
212
    to send a correct disk, otherwise we raise an error.
213

    
214
    """
215
    bdev_s, ndev_s = params
216
    bdev = objects.Disk.FromDict(bdev_s)
217
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
218
    if bdev is None or ndevs.count(None) > 0:
219
      raise ValueError("can't unserialize data!")
220
    return backend.BlockdevAddchildren(bdev, ndevs)
221

    
222
  @staticmethod
223
  def perspective_blockdev_removechildren(params):
224
    """Remove a child from a mirror device.
225

    
226
    This is only valid for mirror devices, of course. It's the callers
227
    duty to send a correct disk, otherwise we raise an error.
228

    
229
    """
230
    bdev_s, ndev_s = params
231
    bdev = objects.Disk.FromDict(bdev_s)
232
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
233
    if bdev is None or ndevs.count(None) > 0:
234
      raise ValueError("can't unserialize data!")
235
    return backend.BlockdevRemovechildren(bdev, ndevs)
236

    
237
  @staticmethod
238
  def perspective_blockdev_getmirrorstatus(params):
239
    """Return the mirror status for a list of disks.
240

    
241
    """
242
    disks = [objects.Disk.FromDict(dsk_s)
243
             for dsk_s in params]
244
    return [status.ToDict()
245
            for status in backend.BlockdevGetmirrorstatus(disks)]
246

    
247
  @staticmethod
248
  def perspective_blockdev_find(params):
249
    """Expose the FindBlockDevice functionality for a disk.
250

    
251
    This will try to find but not activate a disk.
252

    
253
    """
254
    disk = objects.Disk.FromDict(params[0])
255

    
256
    result = backend.BlockdevFind(disk)
257
    if result is None:
258
      return None
259

    
260
    return result.ToDict()
261

    
262
  @staticmethod
263
  def perspective_blockdev_snapshot(params):
264
    """Create a snapshot device.
265

    
266
    Note that this is only valid for LVM disks, if we get passed
267
    something else we raise an exception. The snapshot device can be
268
    remove by calling the generic block device remove call.
269

    
270
    """
271
    cfbd = objects.Disk.FromDict(params[0])
272
    return backend.BlockdevSnapshot(cfbd)
273

    
274
  @staticmethod
275
  def perspective_blockdev_grow(params):
276
    """Grow a stack of devices.
277

    
278
    """
279
    cfbd = objects.Disk.FromDict(params[0])
280
    amount = params[1]
281
    return backend.BlockdevGrow(cfbd, amount)
282

    
283
  @staticmethod
284
  def perspective_blockdev_close(params):
285
    """Closes the given block devices.
286

    
287
    """
288
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
289
    return backend.BlockdevClose(params[0], disks)
290

    
291
  @staticmethod
292
  def perspective_blockdev_getsize(params):
293
    """Compute the sizes of the given block devices.
294

    
295
    """
296
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
297
    return backend.BlockdevGetsize(disks)
298

    
299
  @staticmethod
300
  def perspective_blockdev_export(params):
301
    """Compute the sizes of the given block devices.
302

    
303
    """
304
    disk = objects.Disk.FromDict(params[0])
305
    dest_node, dest_path, cluster_name = params[1:]
306
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
307

    
308
  # blockdev/drbd specific methods ----------
309

    
310
  @staticmethod
311
  def perspective_drbd_disconnect_net(params):
312
    """Disconnects the network connection of drbd disks.
313

    
314
    Note that this is only valid for drbd disks, so the members of the
315
    disk list must all be drbd devices.
316

    
317
    """
318
    nodes_ip, disks = params
319
    disks = [objects.Disk.FromDict(cf) for cf in disks]
320
    return backend.DrbdDisconnectNet(nodes_ip, disks)
321

    
322
  @staticmethod
323
  def perspective_drbd_attach_net(params):
324
    """Attaches the network connection of drbd disks.
325

    
326
    Note that this is only valid for drbd disks, so the members of the
327
    disk list must all be drbd devices.
328

    
329
    """
330
    nodes_ip, disks, instance_name, multimaster = params
331
    disks = [objects.Disk.FromDict(cf) for cf in disks]
332
    return backend.DrbdAttachNet(nodes_ip, disks,
333
                                     instance_name, multimaster)
334

    
335
  @staticmethod
336
  def perspective_drbd_wait_sync(params):
337
    """Wait until DRBD disks are synched.
338

    
339
    Note that this is only valid for drbd disks, so the members of the
340
    disk list must all be drbd devices.
341

    
342
    """
343
    nodes_ip, disks = params
344
    disks = [objects.Disk.FromDict(cf) for cf in disks]
345
    return backend.DrbdWaitSync(nodes_ip, disks)
346

    
347
  # export/import  --------------------------
348

    
349
  @staticmethod
350
  def perspective_snapshot_export(params):
351
    """Export a given snapshot.
352

    
353
    """
354
    disk = objects.Disk.FromDict(params[0])
355
    dest_node = params[1]
356
    instance = objects.Instance.FromDict(params[2])
357
    cluster_name = params[3]
358
    dev_idx = params[4]
359
    debug = params[5]
360
    return backend.ExportSnapshot(disk, dest_node, instance,
361
                                  cluster_name, dev_idx, debug)
362

    
363
  @staticmethod
364
  def perspective_finalize_export(params):
365
    """Expose the finalize export functionality.
366

    
367
    """
368
    instance = objects.Instance.FromDict(params[0])
369

    
370
    snap_disks = []
371
    for disk in params[1]:
372
      if isinstance(disk, bool):
373
        snap_disks.append(disk)
374
      else:
375
        snap_disks.append(objects.Disk.FromDict(disk))
376

    
377
    return backend.FinalizeExport(instance, snap_disks)
378

    
379
  @staticmethod
380
  def perspective_export_info(params):
381
    """Query information about an existing export on this node.
382

    
383
    The given path may not contain an export, in which case we return
384
    None.
385

    
386
    """
387
    path = params[0]
388
    return backend.ExportInfo(path)
389

    
390
  @staticmethod
391
  def perspective_export_list(params):
392
    """List the available exports on this node.
393

    
394
    Note that as opposed to export_info, which may query data about an
395
    export in any path, this only queries the standard Ganeti path
396
    (constants.EXPORT_DIR).
397

    
398
    """
399
    return backend.ListExports()
400

    
401
  @staticmethod
402
  def perspective_export_remove(params):
403
    """Remove an export.
404

    
405
    """
406
    export = params[0]
407
    return backend.RemoveExport(export)
408

    
409
  # volume  --------------------------
410

    
411
  @staticmethod
412
  def perspective_lv_list(params):
413
    """Query the list of logical volumes in a given volume group.
414

    
415
    """
416
    vgname = params[0]
417
    return backend.GetVolumeList(vgname)
418

    
419
  @staticmethod
420
  def perspective_vg_list(params):
421
    """Query the list of volume groups.
422

    
423
    """
424
    return backend.ListVolumeGroups()
425

    
426
  # Storage --------------------------
427

    
428
  @staticmethod
429
  def perspective_storage_list(params):
430
    """Get list of storage units.
431

    
432
    """
433
    (su_name, su_args, name, fields) = params
434
    return storage.GetStorage(su_name, *su_args).List(name, fields)
435

    
436
  @staticmethod
437
  def perspective_storage_modify(params):
438
    """Modify a storage unit.
439

    
440
    """
441
    (su_name, su_args, name, changes) = params
442
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
443

    
444
  @staticmethod
445
  def perspective_storage_execute(params):
446
    """Execute an operation on a storage unit.
447

    
448
    """
449
    (su_name, su_args, name, op) = params
450
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
451

    
452
  # bridge  --------------------------
453

    
454
  @staticmethod
455
  def perspective_bridges_exist(params):
456
    """Check if all bridges given exist on this node.
457

    
458
    """
459
    bridges_list = params[0]
460
    return backend.BridgesExist(bridges_list)
461

    
462
  # instance  --------------------------
463

    
464
  @staticmethod
465
  def perspective_instance_os_add(params):
466
    """Install an OS on a given instance.
467

    
468
    """
469
    inst_s = params[0]
470
    inst = objects.Instance.FromDict(inst_s)
471
    reinstall = params[1]
472
    debug = params[2]
473
    return backend.InstanceOsAdd(inst, reinstall, debug)
474

    
475
  @staticmethod
476
  def perspective_instance_run_rename(params):
477
    """Runs the OS rename script for an instance.
478

    
479
    """
480
    inst_s, old_name, debug = params
481
    inst = objects.Instance.FromDict(inst_s)
482
    return backend.RunRenameInstance(inst, old_name, debug)
483

    
484
  @staticmethod
485
  def perspective_instance_os_import(params):
486
    """Run the import function of an OS onto a given instance.
487

    
488
    """
489
    inst_s, src_node, src_images, cluster_name, debug = params
490
    inst = objects.Instance.FromDict(inst_s)
491
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
492
                                        cluster_name, debug)
493

    
494
  @staticmethod
495
  def perspective_instance_shutdown(params):
496
    """Shutdown an instance.
497

    
498
    """
499
    instance = objects.Instance.FromDict(params[0])
500
    timeout = params[1]
501
    return backend.InstanceShutdown(instance, timeout)
502

    
503
  @staticmethod
504
  def perspective_instance_start(params):
505
    """Start an instance.
506

    
507
    """
508
    instance = objects.Instance.FromDict(params[0])
509
    return backend.StartInstance(instance)
510

    
511
  @staticmethod
512
  def perspective_migration_info(params):
513
    """Gather information about an instance to be migrated.
514

    
515
    """
516
    instance = objects.Instance.FromDict(params[0])
517
    return backend.MigrationInfo(instance)
518

    
519
  @staticmethod
520
  def perspective_accept_instance(params):
521
    """Prepare the node to accept an instance.
522

    
523
    """
524
    instance, info, target = params
525
    instance = objects.Instance.FromDict(instance)
526
    return backend.AcceptInstance(instance, info, target)
527

    
528
  @staticmethod
529
  def perspective_finalize_migration(params):
530
    """Finalize the instance migration.
531

    
532
    """
533
    instance, info, success = params
534
    instance = objects.Instance.FromDict(instance)
535
    return backend.FinalizeMigration(instance, info, success)
536

    
537
  @staticmethod
538
  def perspective_instance_migrate(params):
539
    """Migrates an instance.
540

    
541
    """
542
    instance, target, live = params
543
    instance = objects.Instance.FromDict(instance)
544
    return backend.MigrateInstance(instance, target, live)
545

    
546
  @staticmethod
547
  def perspective_instance_reboot(params):
548
    """Reboot an instance.
549

    
550
    """
551
    instance = objects.Instance.FromDict(params[0])
552
    reboot_type = params[1]
553
    shutdown_timeout = params[2]
554
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
555

    
556
  @staticmethod
557
  def perspective_instance_info(params):
558
    """Query instance information.
559

    
560
    """
561
    return backend.GetInstanceInfo(params[0], params[1])
562

    
563
  @staticmethod
564
  def perspective_instance_migratable(params):
565
    """Query whether the specified instance can be migrated.
566

    
567
    """
568
    instance = objects.Instance.FromDict(params[0])
569
    return backend.GetInstanceMigratable(instance)
570

    
571
  @staticmethod
572
  def perspective_all_instances_info(params):
573
    """Query information about all instances.
574

    
575
    """
576
    return backend.GetAllInstancesInfo(params[0])
577

    
578
  @staticmethod
579
  def perspective_instance_list(params):
580
    """Query the list of running instances.
581

    
582
    """
583
    return backend.GetInstanceList(params[0])
584

    
585
  # node --------------------------
586

    
587
  @staticmethod
588
  def perspective_node_tcp_ping(params):
589
    """Do a TcpPing on the remote node.
590

    
591
    """
592
    return utils.TcpPing(params[1], params[2], timeout=params[3],
593
                         live_port_needed=params[4], source=params[0])
594

    
595
  @staticmethod
596
  def perspective_node_has_ip_address(params):
597
    """Checks if a node has the given ip address.
598

    
599
    """
600
    return utils.OwnIpAddress(params[0])
601

    
602
  @staticmethod
603
  def perspective_node_info(params):
604
    """Query node information.
605

    
606
    """
607
    vgname, hypervisor_type = params
608
    return backend.GetNodeInfo(vgname, hypervisor_type)
609

    
610
  @staticmethod
611
  def perspective_node_add(params):
612
    """Complete the registration of this node in the cluster.
613

    
614
    """
615
    return backend.AddNode(params[0], params[1], params[2],
616
                           params[3], params[4], params[5])
617

    
618
  @staticmethod
619
  def perspective_node_verify(params):
620
    """Run a verify sequence on this node.
621

    
622
    """
623
    return backend.VerifyNode(params[0], params[1])
624

    
625
  @staticmethod
626
  def perspective_node_start_master(params):
627
    """Promote this node to master status.
628

    
629
    """
630
    return backend.StartMaster(params[0], params[1])
631

    
632
  @staticmethod
633
  def perspective_node_stop_master(params):
634
    """Demote this node from master status.
635

    
636
    """
637
    return backend.StopMaster(params[0])
638

    
639
  @staticmethod
640
  def perspective_node_leave_cluster(params):
641
    """Cleanup after leaving a cluster.
642

    
643
    """
644
    return backend.LeaveCluster(params[0])
645

    
646
  @staticmethod
647
  def perspective_node_volumes(params):
648
    """Query the list of all logical volume groups.
649

    
650
    """
651
    return backend.NodeVolumes()
652

    
653
  @staticmethod
654
  def perspective_node_demote_from_mc(params):
655
    """Demote a node from the master candidate role.
656

    
657
    """
658
    return backend.DemoteFromMC()
659

    
660

    
661
  @staticmethod
662
  def perspective_node_powercycle(params):
663
    """Tries to powercycle the nod.
664

    
665
    """
666
    hypervisor_type = params[0]
667
    return backend.PowercycleNode(hypervisor_type)
668

    
669

    
670
  # cluster --------------------------
671

    
672
  @staticmethod
673
  def perspective_version(params):
674
    """Query version information.
675

    
676
    """
677
    return constants.PROTOCOL_VERSION
678

    
679
  @staticmethod
680
  def perspective_upload_file(params):
681
    """Upload a file.
682

    
683
    Note that the backend implementation imposes strict rules on which
684
    files are accepted.
685

    
686
    """
687
    return backend.UploadFile(*params)
688

    
689
  @staticmethod
690
  def perspective_master_info(params):
691
    """Query master information.
692

    
693
    """
694
    return backend.GetMasterInfo()
695

    
696
  @staticmethod
697
  def perspective_write_ssconf_files(params):
698
    """Write ssconf files.
699

    
700
    """
701
    (values,) = params
702
    return backend.WriteSsconfFiles(values)
703

    
704
  # os -----------------------
705

    
706
  @staticmethod
707
  def perspective_os_diagnose(params):
708
    """Query detailed information about existing OSes.
709

    
710
    """
711
    return backend.DiagnoseOS()
712

    
713
  @staticmethod
714
  def perspective_os_get(params):
715
    """Query information about a given OS.
716

    
717
    """
718
    name = params[0]
719
    os_obj = backend.OSFromDisk(name)
720
    return os_obj.ToDict()
721

    
722
  # hooks -----------------------
723

    
724
  @staticmethod
725
  def perspective_hooks_runner(params):
726
    """Run hook scripts.
727

    
728
    """
729
    hpath, phase, env = params
730
    hr = backend.HooksRunner()
731
    return hr.RunHooks(hpath, phase, env)
732

    
733
  # iallocator -----------------
734

    
735
  @staticmethod
736
  def perspective_iallocator_runner(params):
737
    """Run an iallocator script.
738

    
739
    """
740
    name, idata = params
741
    iar = backend.IAllocatorRunner()
742
    return iar.Run(name, idata)
743

    
744
  # test -----------------------
745

    
746
  @staticmethod
747
  def perspective_test_delay(params):
748
    """Run test delay.
749

    
750
    """
751
    duration = params[0]
752
    status, rval = utils.TestDelay(duration)
753
    if not status:
754
      raise backend.RPCFail(rval)
755
    return rval
756

    
757
  # file storage ---------------
758

    
759
  @staticmethod
760
  def perspective_file_storage_dir_create(params):
761
    """Create the file storage directory.
762

    
763
    """
764
    file_storage_dir = params[0]
765
    return backend.CreateFileStorageDir(file_storage_dir)
766

    
767
  @staticmethod
768
  def perspective_file_storage_dir_remove(params):
769
    """Remove the file storage directory.
770

    
771
    """
772
    file_storage_dir = params[0]
773
    return backend.RemoveFileStorageDir(file_storage_dir)
774

    
775
  @staticmethod
776
  def perspective_file_storage_dir_rename(params):
777
    """Rename the file storage directory.
778

    
779
    """
780
    old_file_storage_dir = params[0]
781
    new_file_storage_dir = params[1]
782
    return backend.RenameFileStorageDir(old_file_storage_dir,
783
                                        new_file_storage_dir)
784

    
785
  # jobs ------------------------
786

    
787
  @staticmethod
788
  @_RequireJobQueueLock
789
  def perspective_jobqueue_update(params):
790
    """Update job queue.
791

    
792
    """
793
    (file_name, content) = params
794
    return backend.JobQueueUpdate(file_name, content)
795

    
796
  @staticmethod
797
  @_RequireJobQueueLock
798
  def perspective_jobqueue_purge(params):
799
    """Purge job queue.
800

    
801
    """
802
    return backend.JobQueuePurge()
803

    
804
  @staticmethod
805
  @_RequireJobQueueLock
806
  def perspective_jobqueue_rename(params):
807
    """Rename a job queue file.
808

    
809
    """
810
    # TODO: What if a file fails to rename?
811
    return [backend.JobQueueRename(old, new) for old, new in params]
812

    
813
  @staticmethod
814
  def perspective_jobqueue_set_drain(params):
815
    """Set/unset the queue drain flag.
816

    
817
    """
818
    drain_flag = params[0]
819
    return backend.JobQueueSetDrainFlag(drain_flag)
820

    
821

    
822
  # hypervisor ---------------
823

    
824
  @staticmethod
825
  def perspective_hypervisor_validate_params(params):
826
    """Validate the hypervisor parameters.
827

    
828
    """
829
    (hvname, hvparams) = params
830
    return backend.ValidateHVParams(hvname, hvparams)
831

    
832

    
833
def CheckNoded(_, args):
834
  """Initial checks whether to run or exit with a failure.
835

    
836
  """
837
  if args: # noded doesn't take any arguments
838
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
839
                          sys.argv[0])
840
    sys.exit(constants.EXIT_FAILURE)
841

    
842

    
843
def ExecNoded(options, _):
844
  """Main node daemon function, executed with the PID file held.
845

    
846
  """
847
  if options.mlock:
848
    request_executor_class = MlockallRequestExecutor
849
    try:
850
      utils.Mlockall()
851
    except errors.NoCtypesError:
852
      logging.warning("Cannot set memory lock, ctypes module not found")
853
      request_executor_class = http.server.HttpServerRequestExecutor
854
  else:
855
    request_executor_class = http.server.HttpServerRequestExecutor
856

    
857
  # Read SSL certificate
858
  if options.ssl:
859
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
860
                                    ssl_cert_path=options.ssl_cert)
861
  else:
862
    ssl_params = None
863

    
864
  err = _PrepareQueueLock()
865
  if err is not None:
866
    # this might be some kind of file-system/permission error; while
867
    # this breaks the job queue functionality, we shouldn't prevent
868
    # startup of the whole node daemon because of this
869
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
870

    
871
  mainloop = daemon.Mainloop()
872
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
873
                          ssl_params=ssl_params, ssl_verify_peer=True,
874
                          request_executor_class=request_executor_class)
875
  server.Start()
876
  try:
877
    mainloop.Run()
878
  finally:
879
    server.Stop()
880

    
881

    
882
def main():
883
  """Main function for the node daemon.
884

    
885
  """
886
  parser = OptionParser(description="Ganeti node daemon",
887
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
888
                        version="%%prog (ganeti) %s" %
889
                        constants.RELEASE_VERSION)
890
  parser.add_option("--no-mlock", dest="mlock",
891
                    help="Do not mlock the node memory in ram",
892
                    default=True, action="store_false")
893

    
894
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
895
  dirs.append((constants.LOG_OS_DIR, 0750))
896
  dirs.append((constants.LOCK_DIR, 1777))
897
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
898
                     console_logging=True)
899

    
900

    
901
if __name__ == '__main__':
902
  main()