Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 02bea2fc

History | View | Annotate | Download (22.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48

    
49
import ganeti.http.server # pylint: disable-msg=W0611
50

    
51

    
52
queue_lock = None
53

    
54

    
55
def _PrepareQueueLock():
56
  """Try to prepare the queue lock.
57

    
58
  @return: None for success, otherwise an exception object
59

    
60
  """
61
  global queue_lock # pylint: disable-msg=W0603
62

    
63
  if queue_lock is not None:
64
    return None
65

    
66
  # Prepare job queue
67
  try:
68
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
69
    return None
70
  except EnvironmentError, err:
71
    return err
72

    
73

    
74
def _RequireJobQueueLock(fn):
75
  """Decorator for job queue manipulating functions.
76

    
77
  """
78
  QUEUE_LOCK_TIMEOUT = 10
79

    
80
  def wrapper(*args, **kwargs):
81
    # Locking in exclusive, blocking mode because there could be several
82
    # children running at the same time. Waiting up to 10 seconds.
83
    if _PrepareQueueLock() is not None:
84
      raise errors.JobQueueError("Job queue failed initialization,"
85
                                 " cannot update jobs")
86
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
87
    try:
88
      return fn(*args, **kwargs)
89
    finally:
90
      queue_lock.Unlock()
91

    
92
  return wrapper
93

    
94

    
95
class NodeHttpServer(http.server.HttpServer):
96
  """The server implementation.
97

    
98
  This class holds all methods exposed over the RPC interface.
99

    
100
  """
101
  # too many public methods, and unused args - all methods get params
102
  # due to the API
103
  # pylint: disable-msg=R0904,W0613
104
  def __init__(self, *args, **kwargs):
105
    http.server.HttpServer.__init__(self, *args, **kwargs)
106
    self.noded_pid = os.getpid()
107

    
108
  def HandleRequest(self, req):
109
    """Handle a request.
110

    
111
    """
112
    if req.request_method.upper() != http.HTTP_PUT:
113
      raise http.HttpBadRequest()
114

    
115
    path = req.request_path
116
    if path.startswith("/"):
117
      path = path[1:]
118

    
119
    method = getattr(self, "perspective_%s" % path, None)
120
    if method is None:
121
      raise http.HttpNotFound()
122

    
123
    try:
124
      rvalue = method(req.request_body)
125
      return True, rvalue
126

    
127
    except backend.RPCFail, err:
128
      # our custom failure exception; str(err) works fine if the
129
      # exception was constructed with a single argument, and in
130
      # this case, err.message == err.args[0] == str(err)
131
      return (False, str(err))
132
    except errors.QuitGanetiException, err:
133
      # Tell parent to quit
134
      logging.info("Shutting down the node daemon, arguments: %s",
135
                   str(err.args))
136
      os.kill(self.noded_pid, signal.SIGTERM)
137
      # And return the error's arguments, which must be already in
138
      # correct tuple format
139
      return err.args
140
    except Exception, err:
141
      logging.exception("Error in RPC call")
142
      return False, "Error while executing backend function: %s" % str(err)
143

    
144
  # the new block devices  --------------------------
145

    
146
  @staticmethod
147
  def perspective_blockdev_create(params):
148
    """Create a block device.
149

    
150
    """
151
    bdev_s, size, owner, on_primary, info = params
152
    bdev = objects.Disk.FromDict(bdev_s)
153
    if bdev is None:
154
      raise ValueError("can't unserialize data!")
155
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
156

    
157
  @staticmethod
158
  def perspective_blockdev_remove(params):
159
    """Remove a block device.
160

    
161
    """
162
    bdev_s = params[0]
163
    bdev = objects.Disk.FromDict(bdev_s)
164
    return backend.BlockdevRemove(bdev)
165

    
166
  @staticmethod
167
  def perspective_blockdev_rename(params):
168
    """Remove a block device.
169

    
170
    """
171
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
172
    return backend.BlockdevRename(devlist)
173

    
174
  @staticmethod
175
  def perspective_blockdev_assemble(params):
176
    """Assemble a block device.
177

    
178
    """
179
    bdev_s, owner, on_primary = params
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    if bdev is None:
182
      raise ValueError("can't unserialize data!")
183
    return backend.BlockdevAssemble(bdev, owner, on_primary)
184

    
185
  @staticmethod
186
  def perspective_blockdev_shutdown(params):
187
    """Shutdown a block device.
188

    
189
    """
190
    bdev_s = params[0]
191
    bdev = objects.Disk.FromDict(bdev_s)
192
    if bdev is None:
193
      raise ValueError("can't unserialize data!")
194
    return backend.BlockdevShutdown(bdev)
195

    
196
  @staticmethod
197
  def perspective_blockdev_addchildren(params):
198
    """Add a child to a mirror device.
199

    
200
    Note: this is only valid for mirror devices. It's the caller's duty
201
    to send a correct disk, otherwise we raise an error.
202

    
203
    """
204
    bdev_s, ndev_s = params
205
    bdev = objects.Disk.FromDict(bdev_s)
206
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
207
    if bdev is None or ndevs.count(None) > 0:
208
      raise ValueError("can't unserialize data!")
209
    return backend.BlockdevAddchildren(bdev, ndevs)
210

    
211
  @staticmethod
212
  def perspective_blockdev_removechildren(params):
213
    """Remove a child from a mirror device.
214

    
215
    This is only valid for mirror devices, of course. It's the callers
216
    duty to send a correct disk, otherwise we raise an error.
217

    
218
    """
219
    bdev_s, ndev_s = params
220
    bdev = objects.Disk.FromDict(bdev_s)
221
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
222
    if bdev is None or ndevs.count(None) > 0:
223
      raise ValueError("can't unserialize data!")
224
    return backend.BlockdevRemovechildren(bdev, ndevs)
225

    
226
  @staticmethod
227
  def perspective_blockdev_getmirrorstatus(params):
228
    """Return the mirror status for a list of disks.
229

    
230
    """
231
    disks = [objects.Disk.FromDict(dsk_s)
232
             for dsk_s in params]
233
    return [status.ToDict()
234
            for status in backend.BlockdevGetmirrorstatus(disks)]
235

    
236
  @staticmethod
237
  def perspective_blockdev_find(params):
238
    """Expose the FindBlockDevice functionality for a disk.
239

    
240
    This will try to find but not activate a disk.
241

    
242
    """
243
    disk = objects.Disk.FromDict(params[0])
244

    
245
    result = backend.BlockdevFind(disk)
246
    if result is None:
247
      return None
248

    
249
    return result.ToDict()
250

    
251
  @staticmethod
252
  def perspective_blockdev_snapshot(params):
253
    """Create a snapshot device.
254

    
255
    Note that this is only valid for LVM disks, if we get passed
256
    something else we raise an exception. The snapshot device can be
257
    remove by calling the generic block device remove call.
258

    
259
    """
260
    cfbd = objects.Disk.FromDict(params[0])
261
    return backend.BlockdevSnapshot(cfbd)
262

    
263
  @staticmethod
264
  def perspective_blockdev_grow(params):
265
    """Grow a stack of devices.
266

    
267
    """
268
    cfbd = objects.Disk.FromDict(params[0])
269
    amount = params[1]
270
    return backend.BlockdevGrow(cfbd, amount)
271

    
272
  @staticmethod
273
  def perspective_blockdev_close(params):
274
    """Closes the given block devices.
275

    
276
    """
277
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
278
    return backend.BlockdevClose(params[0], disks)
279

    
280
  @staticmethod
281
  def perspective_blockdev_getsize(params):
282
    """Compute the sizes of the given block devices.
283

    
284
    """
285
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
286
    return backend.BlockdevGetsize(disks)
287

    
288
  @staticmethod
289
  def perspective_blockdev_export(params):
290
    """Compute the sizes of the given block devices.
291

    
292
    """
293
    disk = objects.Disk.FromDict(params[0])
294
    dest_node, dest_path, cluster_name = params[1:]
295
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
296

    
297
  # blockdev/drbd specific methods ----------
298

    
299
  @staticmethod
300
  def perspective_drbd_disconnect_net(params):
301
    """Disconnects the network connection of drbd disks.
302

    
303
    Note that this is only valid for drbd disks, so the members of the
304
    disk list must all be drbd devices.
305

    
306
    """
307
    nodes_ip, disks = params
308
    disks = [objects.Disk.FromDict(cf) for cf in disks]
309
    return backend.DrbdDisconnectNet(nodes_ip, disks)
310

    
311
  @staticmethod
312
  def perspective_drbd_attach_net(params):
313
    """Attaches the network connection of drbd disks.
314

    
315
    Note that this is only valid for drbd disks, so the members of the
316
    disk list must all be drbd devices.
317

    
318
    """
319
    nodes_ip, disks, instance_name, multimaster = params
320
    disks = [objects.Disk.FromDict(cf) for cf in disks]
321
    return backend.DrbdAttachNet(nodes_ip, disks,
322
                                     instance_name, multimaster)
323

    
324
  @staticmethod
325
  def perspective_drbd_wait_sync(params):
326
    """Wait until DRBD disks are synched.
327

    
328
    Note that this is only valid for drbd disks, so the members of the
329
    disk list must all be drbd devices.
330

    
331
    """
332
    nodes_ip, disks = params
333
    disks = [objects.Disk.FromDict(cf) for cf in disks]
334
    return backend.DrbdWaitSync(nodes_ip, disks)
335

    
336
  # export/import  --------------------------
337

    
338
  @staticmethod
339
  def perspective_snapshot_export(params):
340
    """Export a given snapshot.
341

    
342
    """
343
    disk = objects.Disk.FromDict(params[0])
344
    dest_node = params[1]
345
    instance = objects.Instance.FromDict(params[2])
346
    cluster_name = params[3]
347
    dev_idx = params[4]
348
    debug = params[5]
349
    return backend.ExportSnapshot(disk, dest_node, instance,
350
                                  cluster_name, dev_idx, debug)
351

    
352
  @staticmethod
353
  def perspective_finalize_export(params):
354
    """Expose the finalize export functionality.
355

    
356
    """
357
    instance = objects.Instance.FromDict(params[0])
358

    
359
    snap_disks = []
360
    for disk in params[1]:
361
      if isinstance(disk, bool):
362
        snap_disks.append(disk)
363
      else:
364
        snap_disks.append(objects.Disk.FromDict(disk))
365

    
366
    return backend.FinalizeExport(instance, snap_disks)
367

    
368
  @staticmethod
369
  def perspective_export_info(params):
370
    """Query information about an existing export on this node.
371

    
372
    The given path may not contain an export, in which case we return
373
    None.
374

    
375
    """
376
    path = params[0]
377
    return backend.ExportInfo(path)
378

    
379
  @staticmethod
380
  def perspective_export_list(params):
381
    """List the available exports on this node.
382

    
383
    Note that as opposed to export_info, which may query data about an
384
    export in any path, this only queries the standard Ganeti path
385
    (constants.EXPORT_DIR).
386

    
387
    """
388
    return backend.ListExports()
389

    
390
  @staticmethod
391
  def perspective_export_remove(params):
392
    """Remove an export.
393

    
394
    """
395
    export = params[0]
396
    return backend.RemoveExport(export)
397

    
398
  # volume  --------------------------
399

    
400
  @staticmethod
401
  def perspective_lv_list(params):
402
    """Query the list of logical volumes in a given volume group.
403

    
404
    """
405
    vgname = params[0]
406
    return backend.GetVolumeList(vgname)
407

    
408
  @staticmethod
409
  def perspective_vg_list(params):
410
    """Query the list of volume groups.
411

    
412
    """
413
    return backend.ListVolumeGroups()
414

    
415
  # Storage --------------------------
416

    
417
  @staticmethod
418
  def perspective_storage_list(params):
419
    """Get list of storage units.
420

    
421
    """
422
    (su_name, su_args, name, fields) = params
423
    return storage.GetStorage(su_name, *su_args).List(name, fields)
424

    
425
  @staticmethod
426
  def perspective_storage_modify(params):
427
    """Modify a storage unit.
428

    
429
    """
430
    (su_name, su_args, name, changes) = params
431
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
432

    
433
  @staticmethod
434
  def perspective_storage_execute(params):
435
    """Execute an operation on a storage unit.
436

    
437
    """
438
    (su_name, su_args, name, op) = params
439
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
440

    
441
  # bridge  --------------------------
442

    
443
  @staticmethod
444
  def perspective_bridges_exist(params):
445
    """Check if all bridges given exist on this node.
446

    
447
    """
448
    bridges_list = params[0]
449
    return backend.BridgesExist(bridges_list)
450

    
451
  # instance  --------------------------
452

    
453
  @staticmethod
454
  def perspective_instance_os_add(params):
455
    """Install an OS on a given instance.
456

    
457
    """
458
    inst_s = params[0]
459
    inst = objects.Instance.FromDict(inst_s)
460
    reinstall = params[1]
461
    debug = params[2]
462
    return backend.InstanceOsAdd(inst, reinstall, debug)
463

    
464
  @staticmethod
465
  def perspective_instance_run_rename(params):
466
    """Runs the OS rename script for an instance.
467

    
468
    """
469
    inst_s, old_name, debug = params
470
    inst = objects.Instance.FromDict(inst_s)
471
    return backend.RunRenameInstance(inst, old_name, debug)
472

    
473
  @staticmethod
474
  def perspective_instance_os_import(params):
475
    """Run the import function of an OS onto a given instance.
476

    
477
    """
478
    inst_s, src_node, src_images, cluster_name, debug = params
479
    inst = objects.Instance.FromDict(inst_s)
480
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
481
                                        cluster_name, debug)
482

    
483
  @staticmethod
484
  def perspective_instance_shutdown(params):
485
    """Shutdown an instance.
486

    
487
    """
488
    instance = objects.Instance.FromDict(params[0])
489
    timeout = params[1]
490
    return backend.InstanceShutdown(instance, timeout)
491

    
492
  @staticmethod
493
  def perspective_instance_start(params):
494
    """Start an instance.
495

    
496
    """
497
    instance = objects.Instance.FromDict(params[0])
498
    return backend.StartInstance(instance)
499

    
500
  @staticmethod
501
  def perspective_migration_info(params):
502
    """Gather information about an instance to be migrated.
503

    
504
    """
505
    instance = objects.Instance.FromDict(params[0])
506
    return backend.MigrationInfo(instance)
507

    
508
  @staticmethod
509
  def perspective_accept_instance(params):
510
    """Prepare the node to accept an instance.
511

    
512
    """
513
    instance, info, target = params
514
    instance = objects.Instance.FromDict(instance)
515
    return backend.AcceptInstance(instance, info, target)
516

    
517
  @staticmethod
518
  def perspective_finalize_migration(params):
519
    """Finalize the instance migration.
520

    
521
    """
522
    instance, info, success = params
523
    instance = objects.Instance.FromDict(instance)
524
    return backend.FinalizeMigration(instance, info, success)
525

    
526
  @staticmethod
527
  def perspective_instance_migrate(params):
528
    """Migrates an instance.
529

    
530
    """
531
    instance, target, live = params
532
    instance = objects.Instance.FromDict(instance)
533
    return backend.MigrateInstance(instance, target, live)
534

    
535
  @staticmethod
536
  def perspective_instance_reboot(params):
537
    """Reboot an instance.
538

    
539
    """
540
    instance = objects.Instance.FromDict(params[0])
541
    reboot_type = params[1]
542
    shutdown_timeout = params[2]
543
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
544

    
545
  @staticmethod
546
  def perspective_instance_info(params):
547
    """Query instance information.
548

    
549
    """
550
    return backend.GetInstanceInfo(params[0], params[1])
551

    
552
  @staticmethod
553
  def perspective_instance_migratable(params):
554
    """Query whether the specified instance can be migrated.
555

    
556
    """
557
    instance = objects.Instance.FromDict(params[0])
558
    return backend.GetInstanceMigratable(instance)
559

    
560
  @staticmethod
561
  def perspective_all_instances_info(params):
562
    """Query information about all instances.
563

    
564
    """
565
    return backend.GetAllInstancesInfo(params[0])
566

    
567
  @staticmethod
568
  def perspective_instance_list(params):
569
    """Query the list of running instances.
570

    
571
    """
572
    return backend.GetInstanceList(params[0])
573

    
574
  # node --------------------------
575

    
576
  @staticmethod
577
  def perspective_node_tcp_ping(params):
578
    """Do a TcpPing on the remote node.
579

    
580
    """
581
    return utils.TcpPing(params[1], params[2], timeout=params[3],
582
                         live_port_needed=params[4], source=params[0])
583

    
584
  @staticmethod
585
  def perspective_node_has_ip_address(params):
586
    """Checks if a node has the given ip address.
587

    
588
    """
589
    return utils.OwnIpAddress(params[0])
590

    
591
  @staticmethod
592
  def perspective_node_info(params):
593
    """Query node information.
594

    
595
    """
596
    vgname, hypervisor_type = params
597
    return backend.GetNodeInfo(vgname, hypervisor_type)
598

    
599
  @staticmethod
600
  def perspective_node_add(params):
601
    """Complete the registration of this node in the cluster.
602

    
603
    """
604
    return backend.AddNode(params[0], params[1], params[2],
605
                           params[3], params[4], params[5])
606

    
607
  @staticmethod
608
  def perspective_node_verify(params):
609
    """Run a verify sequence on this node.
610

    
611
    """
612
    return backend.VerifyNode(params[0], params[1])
613

    
614
  @staticmethod
615
  def perspective_node_start_master(params):
616
    """Promote this node to master status.
617

    
618
    """
619
    return backend.StartMaster(params[0], params[1])
620

    
621
  @staticmethod
622
  def perspective_node_stop_master(params):
623
    """Demote this node from master status.
624

    
625
    """
626
    return backend.StopMaster(params[0])
627

    
628
  @staticmethod
629
  def perspective_node_leave_cluster(params):
630
    """Cleanup after leaving a cluster.
631

    
632
    """
633
    return backend.LeaveCluster(params[0])
634

    
635
  @staticmethod
636
  def perspective_node_volumes(params):
637
    """Query the list of all logical volume groups.
638

    
639
    """
640
    return backend.NodeVolumes()
641

    
642
  @staticmethod
643
  def perspective_node_demote_from_mc(params):
644
    """Demote a node from the master candidate role.
645

    
646
    """
647
    return backend.DemoteFromMC()
648

    
649

    
650
  @staticmethod
651
  def perspective_node_powercycle(params):
652
    """Tries to powercycle the nod.
653

    
654
    """
655
    hypervisor_type = params[0]
656
    return backend.PowercycleNode(hypervisor_type)
657

    
658

    
659
  # cluster --------------------------
660

    
661
  @staticmethod
662
  def perspective_version(params):
663
    """Query version information.
664

    
665
    """
666
    return constants.PROTOCOL_VERSION
667

    
668
  @staticmethod
669
  def perspective_upload_file(params):
670
    """Upload a file.
671

    
672
    Note that the backend implementation imposes strict rules on which
673
    files are accepted.
674

    
675
    """
676
    return backend.UploadFile(*params)
677

    
678
  @staticmethod
679
  def perspective_master_info(params):
680
    """Query master information.
681

    
682
    """
683
    return backend.GetMasterInfo()
684

    
685
  @staticmethod
686
  def perspective_write_ssconf_files(params):
687
    """Write ssconf files.
688

    
689
    """
690
    (values,) = params
691
    return backend.WriteSsconfFiles(values)
692

    
693
  # os -----------------------
694

    
695
  @staticmethod
696
  def perspective_os_diagnose(params):
697
    """Query detailed information about existing OSes.
698

    
699
    """
700
    return backend.DiagnoseOS()
701

    
702
  @staticmethod
703
  def perspective_os_get(params):
704
    """Query information about a given OS.
705

    
706
    """
707
    name = params[0]
708
    os_obj = backend.OSFromDisk(name)
709
    return os_obj.ToDict()
710

    
711
  # hooks -----------------------
712

    
713
  @staticmethod
714
  def perspective_hooks_runner(params):
715
    """Run hook scripts.
716

    
717
    """
718
    hpath, phase, env = params
719
    hr = backend.HooksRunner()
720
    return hr.RunHooks(hpath, phase, env)
721

    
722
  # iallocator -----------------
723

    
724
  @staticmethod
725
  def perspective_iallocator_runner(params):
726
    """Run an iallocator script.
727

    
728
    """
729
    name, idata = params
730
    iar = backend.IAllocatorRunner()
731
    return iar.Run(name, idata)
732

    
733
  # test -----------------------
734

    
735
  @staticmethod
736
  def perspective_test_delay(params):
737
    """Run test delay.
738

    
739
    """
740
    duration = params[0]
741
    status, rval = utils.TestDelay(duration)
742
    if not status:
743
      raise backend.RPCFail(rval)
744
    return rval
745

    
746
  # file storage ---------------
747

    
748
  @staticmethod
749
  def perspective_file_storage_dir_create(params):
750
    """Create the file storage directory.
751

    
752
    """
753
    file_storage_dir = params[0]
754
    return backend.CreateFileStorageDir(file_storage_dir)
755

    
756
  @staticmethod
757
  def perspective_file_storage_dir_remove(params):
758
    """Remove the file storage directory.
759

    
760
    """
761
    file_storage_dir = params[0]
762
    return backend.RemoveFileStorageDir(file_storage_dir)
763

    
764
  @staticmethod
765
  def perspective_file_storage_dir_rename(params):
766
    """Rename the file storage directory.
767

    
768
    """
769
    old_file_storage_dir = params[0]
770
    new_file_storage_dir = params[1]
771
    return backend.RenameFileStorageDir(old_file_storage_dir,
772
                                        new_file_storage_dir)
773

    
774
  # jobs ------------------------
775

    
776
  @staticmethod
777
  @_RequireJobQueueLock
778
  def perspective_jobqueue_update(params):
779
    """Update job queue.
780

    
781
    """
782
    (file_name, content) = params
783
    return backend.JobQueueUpdate(file_name, content)
784

    
785
  @staticmethod
786
  @_RequireJobQueueLock
787
  def perspective_jobqueue_purge(params):
788
    """Purge job queue.
789

    
790
    """
791
    return backend.JobQueuePurge()
792

    
793
  @staticmethod
794
  @_RequireJobQueueLock
795
  def perspective_jobqueue_rename(params):
796
    """Rename a job queue file.
797

    
798
    """
799
    # TODO: What if a file fails to rename?
800
    return [backend.JobQueueRename(old, new) for old, new in params]
801

    
802
  @staticmethod
803
  def perspective_jobqueue_set_drain(params):
804
    """Set/unset the queue drain flag.
805

    
806
    """
807
    drain_flag = params[0]
808
    return backend.JobQueueSetDrainFlag(drain_flag)
809

    
810

    
811
  # hypervisor ---------------
812

    
813
  @staticmethod
814
  def perspective_hypervisor_validate_params(params):
815
    """Validate the hypervisor parameters.
816

    
817
    """
818
    (hvname, hvparams) = params
819
    return backend.ValidateHVParams(hvname, hvparams)
820

    
821

    
822
def CheckNoded(_, args):
823
  """Initial checks whether to run or exit with a failure.
824

    
825
  """
826
  if args: # noded doesn't take any arguments
827
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
828
                          sys.argv[0])
829
    sys.exit(constants.EXIT_FAILURE)
830

    
831

    
832
def ExecNoded(options, _):
833
  """Main node daemon function, executed with the PID file held.
834

    
835
  """
836
  utils.Mlockall()
837

    
838
  # Read SSL certificate
839
  if options.ssl:
840
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
841
                                    ssl_cert_path=options.ssl_cert)
842
  else:
843
    ssl_params = None
844

    
845
  err = _PrepareQueueLock()
846
  if err is not None:
847
    # this might be some kind of file-system/permission error; while
848
    # this breaks the job queue functionality, we shouldn't prevent
849
    # startup of the whole node daemon because of this
850
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
851

    
852
  mainloop = daemon.Mainloop()
853
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
854
                          ssl_params=ssl_params, ssl_verify_peer=True)
855
  server.Start()
856
  try:
857
    mainloop.Run()
858
  finally:
859
    server.Stop()
860

    
861

    
862
def main():
863
  """Main function for the node daemon.
864

    
865
  """
866
  parser = OptionParser(description="Ganeti node daemon",
867
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
868
                        version="%%prog (ganeti) %s" %
869
                        constants.RELEASE_VERSION)
870
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
871
  dirs.append((constants.LOG_OS_DIR, 0750))
872
  dirs.append((constants.LOCK_DIR, 1777))
873
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
874

    
875

    
876
if __name__ == '__main__':
877
  main()