Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ c46b9782

History | View | Annotate | Download (25.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49

    
50
import ganeti.http.server # pylint: disable-msg=W0611
51

    
52

    
53
queue_lock = None
54

    
55

    
56
def _PrepareQueueLock():
57
  """Try to prepare the queue lock.
58

    
59
  @return: None for success, otherwise an exception object
60

    
61
  """
62
  global queue_lock # pylint: disable-msg=W0603
63

    
64
  if queue_lock is not None:
65
    return None
66

    
67
  # Prepare job queue
68
  try:
69
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70
    return None
71
  except EnvironmentError, err:
72
    return err
73

    
74

    
75
def _RequireJobQueueLock(fn):
76
  """Decorator for job queue manipulating functions.
77

    
78
  """
79
  QUEUE_LOCK_TIMEOUT = 10
80

    
81
  def wrapper(*args, **kwargs):
82
    # Locking in exclusive, blocking mode because there could be several
83
    # children running at the same time. Waiting up to 10 seconds.
84
    if _PrepareQueueLock() is not None:
85
      raise errors.JobQueueError("Job queue failed initialization,"
86
                                 " cannot update jobs")
87
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88
    try:
89
      return fn(*args, **kwargs)
90
    finally:
91
      queue_lock.Unlock()
92

    
93
  return wrapper
94

    
95

    
96
def _DecodeImportExportIO(ieio, ieioargs):
97
  """Decodes import/export I/O information.
98

    
99
  """
100
  if ieio == constants.IEIO_RAW_DISK:
101
    assert len(ieioargs) == 1
102
    return (objects.Disk.FromDict(ieioargs[0]), )
103

    
104
  if ieio == constants.IEIO_SCRIPT:
105
    assert len(ieioargs) == 2
106
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
107

    
108
  return ieioargs
109

    
110

    
111
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
112
  """Custom Request Executor class that ensures NodeHttpServer children are
113
  locked in ram.
114

    
115
  """
116
  def __init__(self, *args, **kwargs):
117
    utils.Mlockall()
118

    
119
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
120

    
121

    
122
class NodeHttpServer(http.server.HttpServer):
123
  """The server implementation.
124

    
125
  This class holds all methods exposed over the RPC interface.
126

    
127
  """
128
  # too many public methods, and unused args - all methods get params
129
  # due to the API
130
  # pylint: disable-msg=R0904,W0613
131
  def __init__(self, *args, **kwargs):
132
    http.server.HttpServer.__init__(self, *args, **kwargs)
133
    self.noded_pid = os.getpid()
134

    
135
  def HandleRequest(self, req):
136
    """Handle a request.
137

    
138
    """
139
    if req.request_method.upper() != http.HTTP_PUT:
140
      raise http.HttpBadRequest()
141

    
142
    path = req.request_path
143
    if path.startswith("/"):
144
      path = path[1:]
145

    
146
    method = getattr(self, "perspective_%s" % path, None)
147
    if method is None:
148
      raise http.HttpNotFound()
149

    
150
    try:
151
      result = (True, method(serializer.LoadJson(req.request_body)))
152

    
153
    except backend.RPCFail, err:
154
      # our custom failure exception; str(err) works fine if the
155
      # exception was constructed with a single argument, and in
156
      # this case, err.message == err.args[0] == str(err)
157
      result = (False, str(err))
158
    except errors.QuitGanetiException, err:
159
      # Tell parent to quit
160
      logging.info("Shutting down the node daemon, arguments: %s",
161
                   str(err.args))
162
      os.kill(self.noded_pid, signal.SIGTERM)
163
      # And return the error's arguments, which must be already in
164
      # correct tuple format
165
      result = err.args
166
    except Exception, err:
167
      logging.exception("Error in RPC call")
168
      result = (False, "Error while executing backend function: %s" % str(err))
169

    
170
    return serializer.DumpJson(result, indent=False)
171

    
172
  # the new block devices  --------------------------
173

    
174
  @staticmethod
175
  def perspective_blockdev_create(params):
176
    """Create a block device.
177

    
178
    """
179
    bdev_s, size, owner, on_primary, info = params
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    if bdev is None:
182
      raise ValueError("can't unserialize data!")
183
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
184

    
185
  @staticmethod
186
  def perspective_blockdev_remove(params):
187
    """Remove a block device.
188

    
189
    """
190
    bdev_s = params[0]
191
    bdev = objects.Disk.FromDict(bdev_s)
192
    return backend.BlockdevRemove(bdev)
193

    
194
  @staticmethod
195
  def perspective_blockdev_rename(params):
196
    """Remove a block device.
197

    
198
    """
199
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
200
    return backend.BlockdevRename(devlist)
201

    
202
  @staticmethod
203
  def perspective_blockdev_assemble(params):
204
    """Assemble a block device.
205

    
206
    """
207
    bdev_s, owner, on_primary = params
208
    bdev = objects.Disk.FromDict(bdev_s)
209
    if bdev is None:
210
      raise ValueError("can't unserialize data!")
211
    return backend.BlockdevAssemble(bdev, owner, on_primary)
212

    
213
  @staticmethod
214
  def perspective_blockdev_shutdown(params):
215
    """Shutdown a block device.
216

    
217
    """
218
    bdev_s = params[0]
219
    bdev = objects.Disk.FromDict(bdev_s)
220
    if bdev is None:
221
      raise ValueError("can't unserialize data!")
222
    return backend.BlockdevShutdown(bdev)
223

    
224
  @staticmethod
225
  def perspective_blockdev_addchildren(params):
226
    """Add a child to a mirror device.
227

    
228
    Note: this is only valid for mirror devices. It's the caller's duty
229
    to send a correct disk, otherwise we raise an error.
230

    
231
    """
232
    bdev_s, ndev_s = params
233
    bdev = objects.Disk.FromDict(bdev_s)
234
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
235
    if bdev is None or ndevs.count(None) > 0:
236
      raise ValueError("can't unserialize data!")
237
    return backend.BlockdevAddchildren(bdev, ndevs)
238

    
239
  @staticmethod
240
  def perspective_blockdev_removechildren(params):
241
    """Remove a child from a mirror device.
242

    
243
    This is only valid for mirror devices, of course. It's the callers
244
    duty to send a correct disk, otherwise we raise an error.
245

    
246
    """
247
    bdev_s, ndev_s = params
248
    bdev = objects.Disk.FromDict(bdev_s)
249
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
250
    if bdev is None or ndevs.count(None) > 0:
251
      raise ValueError("can't unserialize data!")
252
    return backend.BlockdevRemovechildren(bdev, ndevs)
253

    
254
  @staticmethod
255
  def perspective_blockdev_getmirrorstatus(params):
256
    """Return the mirror status for a list of disks.
257

    
258
    """
259
    disks = [objects.Disk.FromDict(dsk_s)
260
             for dsk_s in params]
261
    return [status.ToDict()
262
            for status in backend.BlockdevGetmirrorstatus(disks)]
263

    
264
  @staticmethod
265
  def perspective_blockdev_find(params):
266
    """Expose the FindBlockDevice functionality for a disk.
267

    
268
    This will try to find but not activate a disk.
269

    
270
    """
271
    disk = objects.Disk.FromDict(params[0])
272

    
273
    result = backend.BlockdevFind(disk)
274
    if result is None:
275
      return None
276

    
277
    return result.ToDict()
278

    
279
  @staticmethod
280
  def perspective_blockdev_snapshot(params):
281
    """Create a snapshot device.
282

    
283
    Note that this is only valid for LVM disks, if we get passed
284
    something else we raise an exception. The snapshot device can be
285
    remove by calling the generic block device remove call.
286

    
287
    """
288
    cfbd = objects.Disk.FromDict(params[0])
289
    return backend.BlockdevSnapshot(cfbd)
290

    
291
  @staticmethod
292
  def perspective_blockdev_grow(params):
293
    """Grow a stack of devices.
294

    
295
    """
296
    cfbd = objects.Disk.FromDict(params[0])
297
    amount = params[1]
298
    return backend.BlockdevGrow(cfbd, amount)
299

    
300
  @staticmethod
301
  def perspective_blockdev_close(params):
302
    """Closes the given block devices.
303

    
304
    """
305
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
306
    return backend.BlockdevClose(params[0], disks)
307

    
308
  @staticmethod
309
  def perspective_blockdev_getsize(params):
310
    """Compute the sizes of the given block devices.
311

    
312
    """
313
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
314
    return backend.BlockdevGetsize(disks)
315

    
316
  @staticmethod
317
  def perspective_blockdev_export(params):
318
    """Compute the sizes of the given block devices.
319

    
320
    """
321
    disk = objects.Disk.FromDict(params[0])
322
    dest_node, dest_path, cluster_name = params[1:]
323
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
324

    
325
  # blockdev/drbd specific methods ----------
326

    
327
  @staticmethod
328
  def perspective_drbd_disconnect_net(params):
329
    """Disconnects the network connection of drbd disks.
330

    
331
    Note that this is only valid for drbd disks, so the members of the
332
    disk list must all be drbd devices.
333

    
334
    """
335
    nodes_ip, disks = params
336
    disks = [objects.Disk.FromDict(cf) for cf in disks]
337
    return backend.DrbdDisconnectNet(nodes_ip, disks)
338

    
339
  @staticmethod
340
  def perspective_drbd_attach_net(params):
341
    """Attaches the network connection of drbd disks.
342

    
343
    Note that this is only valid for drbd disks, so the members of the
344
    disk list must all be drbd devices.
345

    
346
    """
347
    nodes_ip, disks, instance_name, multimaster = params
348
    disks = [objects.Disk.FromDict(cf) for cf in disks]
349
    return backend.DrbdAttachNet(nodes_ip, disks,
350
                                     instance_name, multimaster)
351

    
352
  @staticmethod
353
  def perspective_drbd_wait_sync(params):
354
    """Wait until DRBD disks are synched.
355

    
356
    Note that this is only valid for drbd disks, so the members of the
357
    disk list must all be drbd devices.
358

    
359
    """
360
    nodes_ip, disks = params
361
    disks = [objects.Disk.FromDict(cf) for cf in disks]
362
    return backend.DrbdWaitSync(nodes_ip, disks)
363

    
364
  @staticmethod
365
  def perspective_drbd_helper(params):
366
    """Query drbd helper.
367

    
368
    """
369
    return backend.GetDrbdUsermodeHelper()
370

    
371
  # export/import  --------------------------
372

    
373
  @staticmethod
374
  def perspective_finalize_export(params):
375
    """Expose the finalize export functionality.
376

    
377
    """
378
    instance = objects.Instance.FromDict(params[0])
379

    
380
    snap_disks = []
381
    for disk in params[1]:
382
      if isinstance(disk, bool):
383
        snap_disks.append(disk)
384
      else:
385
        snap_disks.append(objects.Disk.FromDict(disk))
386

    
387
    return backend.FinalizeExport(instance, snap_disks)
388

    
389
  @staticmethod
390
  def perspective_export_info(params):
391
    """Query information about an existing export on this node.
392

    
393
    The given path may not contain an export, in which case we return
394
    None.
395

    
396
    """
397
    path = params[0]
398
    return backend.ExportInfo(path)
399

    
400
  @staticmethod
401
  def perspective_export_list(params):
402
    """List the available exports on this node.
403

    
404
    Note that as opposed to export_info, which may query data about an
405
    export in any path, this only queries the standard Ganeti path
406
    (constants.EXPORT_DIR).
407

    
408
    """
409
    return backend.ListExports()
410

    
411
  @staticmethod
412
  def perspective_export_remove(params):
413
    """Remove an export.
414

    
415
    """
416
    export = params[0]
417
    return backend.RemoveExport(export)
418

    
419
  # volume  --------------------------
420

    
421
  @staticmethod
422
  def perspective_lv_list(params):
423
    """Query the list of logical volumes in a given volume group.
424

    
425
    """
426
    vgname = params[0]
427
    return backend.GetVolumeList(vgname)
428

    
429
  @staticmethod
430
  def perspective_vg_list(params):
431
    """Query the list of volume groups.
432

    
433
    """
434
    return backend.ListVolumeGroups()
435

    
436
  # Storage --------------------------
437

    
438
  @staticmethod
439
  def perspective_storage_list(params):
440
    """Get list of storage units.
441

    
442
    """
443
    (su_name, su_args, name, fields) = params
444
    return storage.GetStorage(su_name, *su_args).List(name, fields)
445

    
446
  @staticmethod
447
  def perspective_storage_modify(params):
448
    """Modify a storage unit.
449

    
450
    """
451
    (su_name, su_args, name, changes) = params
452
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
453

    
454
  @staticmethod
455
  def perspective_storage_execute(params):
456
    """Execute an operation on a storage unit.
457

    
458
    """
459
    (su_name, su_args, name, op) = params
460
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
461

    
462
  # bridge  --------------------------
463

    
464
  @staticmethod
465
  def perspective_bridges_exist(params):
466
    """Check if all bridges given exist on this node.
467

    
468
    """
469
    bridges_list = params[0]
470
    return backend.BridgesExist(bridges_list)
471

    
472
  # instance  --------------------------
473

    
474
  @staticmethod
475
  def perspective_instance_os_add(params):
476
    """Install an OS on a given instance.
477

    
478
    """
479
    inst_s = params[0]
480
    inst = objects.Instance.FromDict(inst_s)
481
    reinstall = params[1]
482
    debug = params[2]
483
    return backend.InstanceOsAdd(inst, reinstall, debug)
484

    
485
  @staticmethod
486
  def perspective_instance_run_rename(params):
487
    """Runs the OS rename script for an instance.
488

    
489
    """
490
    inst_s, old_name, debug = params
491
    inst = objects.Instance.FromDict(inst_s)
492
    return backend.RunRenameInstance(inst, old_name, debug)
493

    
494
  @staticmethod
495
  def perspective_instance_shutdown(params):
496
    """Shutdown an instance.
497

    
498
    """
499
    instance = objects.Instance.FromDict(params[0])
500
    timeout = params[1]
501
    return backend.InstanceShutdown(instance, timeout)
502

    
503
  @staticmethod
504
  def perspective_instance_start(params):
505
    """Start an instance.
506

    
507
    """
508
    instance = objects.Instance.FromDict(params[0])
509
    return backend.StartInstance(instance)
510

    
511
  @staticmethod
512
  def perspective_migration_info(params):
513
    """Gather information about an instance to be migrated.
514

    
515
    """
516
    instance = objects.Instance.FromDict(params[0])
517
    return backend.MigrationInfo(instance)
518

    
519
  @staticmethod
520
  def perspective_accept_instance(params):
521
    """Prepare the node to accept an instance.
522

    
523
    """
524
    instance, info, target = params
525
    instance = objects.Instance.FromDict(instance)
526
    return backend.AcceptInstance(instance, info, target)
527

    
528
  @staticmethod
529
  def perspective_finalize_migration(params):
530
    """Finalize the instance migration.
531

    
532
    """
533
    instance, info, success = params
534
    instance = objects.Instance.FromDict(instance)
535
    return backend.FinalizeMigration(instance, info, success)
536

    
537
  @staticmethod
538
  def perspective_instance_migrate(params):
539
    """Migrates an instance.
540

    
541
    """
542
    instance, target, live = params
543
    instance = objects.Instance.FromDict(instance)
544
    return backend.MigrateInstance(instance, target, live)
545

    
546
  @staticmethod
547
  def perspective_instance_reboot(params):
548
    """Reboot an instance.
549

    
550
    """
551
    instance = objects.Instance.FromDict(params[0])
552
    reboot_type = params[1]
553
    shutdown_timeout = params[2]
554
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
555

    
556
  @staticmethod
557
  def perspective_instance_info(params):
558
    """Query instance information.
559

    
560
    """
561
    return backend.GetInstanceInfo(params[0], params[1])
562

    
563
  @staticmethod
564
  def perspective_instance_migratable(params):
565
    """Query whether the specified instance can be migrated.
566

    
567
    """
568
    instance = objects.Instance.FromDict(params[0])
569
    return backend.GetInstanceMigratable(instance)
570

    
571
  @staticmethod
572
  def perspective_all_instances_info(params):
573
    """Query information about all instances.
574

    
575
    """
576
    return backend.GetAllInstancesInfo(params[0])
577

    
578
  @staticmethod
579
  def perspective_instance_list(params):
580
    """Query the list of running instances.
581

    
582
    """
583
    return backend.GetInstanceList(params[0])
584

    
585
  # node --------------------------
586

    
587
  @staticmethod
588
  def perspective_node_tcp_ping(params):
589
    """Do a TcpPing on the remote node.
590

    
591
    """
592
    return utils.TcpPing(params[1], params[2], timeout=params[3],
593
                         live_port_needed=params[4], source=params[0])
594

    
595
  @staticmethod
596
  def perspective_node_has_ip_address(params):
597
    """Checks if a node has the given ip address.
598

    
599
    """
600
    return utils.OwnIpAddress(params[0])
601

    
602
  @staticmethod
603
  def perspective_node_info(params):
604
    """Query node information.
605

    
606
    """
607
    vgname, hypervisor_type = params
608
    return backend.GetNodeInfo(vgname, hypervisor_type)
609

    
610
  @staticmethod
611
  def perspective_node_add(params):
612
    """Complete the registration of this node in the cluster.
613

    
614
    """
615
    return backend.AddNode(params[0], params[1], params[2],
616
                           params[3], params[4], params[5])
617

    
618
  @staticmethod
619
  def perspective_node_verify(params):
620
    """Run a verify sequence on this node.
621

    
622
    """
623
    return backend.VerifyNode(params[0], params[1])
624

    
625
  @staticmethod
626
  def perspective_node_start_master(params):
627
    """Promote this node to master status.
628

    
629
    """
630
    return backend.StartMaster(params[0], params[1])
631

    
632
  @staticmethod
633
  def perspective_node_stop_master(params):
634
    """Demote this node from master status.
635

    
636
    """
637
    return backend.StopMaster(params[0])
638

    
639
  @staticmethod
640
  def perspective_node_leave_cluster(params):
641
    """Cleanup after leaving a cluster.
642

    
643
    """
644
    return backend.LeaveCluster(params[0])
645

    
646
  @staticmethod
647
  def perspective_node_volumes(params):
648
    """Query the list of all logical volume groups.
649

    
650
    """
651
    return backend.NodeVolumes()
652

    
653
  @staticmethod
654
  def perspective_node_demote_from_mc(params):
655
    """Demote a node from the master candidate role.
656

    
657
    """
658
    return backend.DemoteFromMC()
659

    
660

    
661
  @staticmethod
662
  def perspective_node_powercycle(params):
663
    """Tries to powercycle the nod.
664

    
665
    """
666
    hypervisor_type = params[0]
667
    return backend.PowercycleNode(hypervisor_type)
668

    
669

    
670
  # cluster --------------------------
671

    
672
  @staticmethod
673
  def perspective_version(params):
674
    """Query version information.
675

    
676
    """
677
    return constants.PROTOCOL_VERSION
678

    
679
  @staticmethod
680
  def perspective_upload_file(params):
681
    """Upload a file.
682

    
683
    Note that the backend implementation imposes strict rules on which
684
    files are accepted.
685

    
686
    """
687
    return backend.UploadFile(*params)
688

    
689
  @staticmethod
690
  def perspective_master_info(params):
691
    """Query master information.
692

    
693
    """
694
    return backend.GetMasterInfo()
695

    
696
  @staticmethod
697
  def perspective_write_ssconf_files(params):
698
    """Write ssconf files.
699

    
700
    """
701
    (values,) = params
702
    return backend.WriteSsconfFiles(values)
703

    
704
  # os -----------------------
705

    
706
  @staticmethod
707
  def perspective_os_diagnose(params):
708
    """Query detailed information about existing OSes.
709

    
710
    """
711
    return backend.DiagnoseOS()
712

    
713
  @staticmethod
714
  def perspective_os_get(params):
715
    """Query information about a given OS.
716

    
717
    """
718
    name = params[0]
719
    os_obj = backend.OSFromDisk(name)
720
    return os_obj.ToDict()
721

    
722
  @staticmethod
723
  def perspective_os_validate(params):
724
    """Run a given OS' validation routine.
725

    
726
    """
727
    required, name, checks, params = params
728
    return backend.ValidateOS(required, name, checks, params)
729

    
730
  # hooks -----------------------
731

    
732
  @staticmethod
733
  def perspective_hooks_runner(params):
734
    """Run hook scripts.
735

    
736
    """
737
    hpath, phase, env = params
738
    hr = backend.HooksRunner()
739
    return hr.RunHooks(hpath, phase, env)
740

    
741
  # iallocator -----------------
742

    
743
  @staticmethod
744
  def perspective_iallocator_runner(params):
745
    """Run an iallocator script.
746

    
747
    """
748
    name, idata = params
749
    iar = backend.IAllocatorRunner()
750
    return iar.Run(name, idata)
751

    
752
  # test -----------------------
753

    
754
  @staticmethod
755
  def perspective_test_delay(params):
756
    """Run test delay.
757

    
758
    """
759
    duration = params[0]
760
    status, rval = utils.TestDelay(duration)
761
    if not status:
762
      raise backend.RPCFail(rval)
763
    return rval
764

    
765
  # file storage ---------------
766

    
767
  @staticmethod
768
  def perspective_file_storage_dir_create(params):
769
    """Create the file storage directory.
770

    
771
    """
772
    file_storage_dir = params[0]
773
    return backend.CreateFileStorageDir(file_storage_dir)
774

    
775
  @staticmethod
776
  def perspective_file_storage_dir_remove(params):
777
    """Remove the file storage directory.
778

    
779
    """
780
    file_storage_dir = params[0]
781
    return backend.RemoveFileStorageDir(file_storage_dir)
782

    
783
  @staticmethod
784
  def perspective_file_storage_dir_rename(params):
785
    """Rename the file storage directory.
786

    
787
    """
788
    old_file_storage_dir = params[0]
789
    new_file_storage_dir = params[1]
790
    return backend.RenameFileStorageDir(old_file_storage_dir,
791
                                        new_file_storage_dir)
792

    
793
  # jobs ------------------------
794

    
795
  @staticmethod
796
  @_RequireJobQueueLock
797
  def perspective_jobqueue_update(params):
798
    """Update job queue.
799

    
800
    """
801
    (file_name, content) = params
802
    return backend.JobQueueUpdate(file_name, content)
803

    
804
  @staticmethod
805
  @_RequireJobQueueLock
806
  def perspective_jobqueue_purge(params):
807
    """Purge job queue.
808

    
809
    """
810
    return backend.JobQueuePurge()
811

    
812
  @staticmethod
813
  @_RequireJobQueueLock
814
  def perspective_jobqueue_rename(params):
815
    """Rename a job queue file.
816

    
817
    """
818
    # TODO: What if a file fails to rename?
819
    return [backend.JobQueueRename(old, new) for old, new in params]
820

    
821
  # hypervisor ---------------
822

    
823
  @staticmethod
824
  def perspective_hypervisor_validate_params(params):
825
    """Validate the hypervisor parameters.
826

    
827
    """
828
    (hvname, hvparams) = params
829
    return backend.ValidateHVParams(hvname, hvparams)
830

    
831
  # Crypto
832

    
833
  @staticmethod
834
  def perspective_x509_cert_create(params):
835
    """Creates a new X509 certificate for SSL/TLS.
836

    
837
    """
838
    (validity, ) = params
839
    return backend.CreateX509Certificate(validity)
840

    
841
  @staticmethod
842
  def perspective_x509_cert_remove(params):
843
    """Removes a X509 certificate.
844

    
845
    """
846
    (name, ) = params
847
    return backend.RemoveX509Certificate(name)
848

    
849
  # Import and export
850

    
851
  @staticmethod
852
  def perspective_import_start(params):
853
    """Starts an import daemon.
854

    
855
    """
856
    (opts_s, instance, dest, dest_args) = params
857

    
858
    opts = objects.ImportExportOptions.FromDict(opts_s)
859

    
860
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
861
                                           None, None,
862
                                           objects.Instance.FromDict(instance),
863
                                           dest,
864
                                           _DecodeImportExportIO(dest,
865
                                                                 dest_args))
866

    
867
  @staticmethod
868
  def perspective_export_start(params):
869
    """Starts an export daemon.
870

    
871
    """
872
    (opts_s, host, port, instance, source, source_args) = params
873

    
874
    opts = objects.ImportExportOptions.FromDict(opts_s)
875

    
876
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
877
                                           host, port,
878
                                           objects.Instance.FromDict(instance),
879
                                           source,
880
                                           _DecodeImportExportIO(source,
881
                                                                 source_args))
882

    
883
  @staticmethod
884
  def perspective_impexp_status(params):
885
    """Retrieves the status of an import or export daemon.
886

    
887
    """
888
    return backend.GetImportExportStatus(params[0])
889

    
890
  @staticmethod
891
  def perspective_impexp_abort(params):
892
    """Aborts an import or export.
893

    
894
    """
895
    return backend.AbortImportExport(params[0])
896

    
897
  @staticmethod
898
  def perspective_impexp_cleanup(params):
899
    """Cleans up after an import or export.
900

    
901
    """
902
    return backend.CleanupImportExport(params[0])
903

    
904

    
905
def CheckNoded(_, args):
906
  """Initial checks whether to run or exit with a failure.
907

    
908
  """
909
  if args: # noded doesn't take any arguments
910
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
911
                          sys.argv[0])
912
    sys.exit(constants.EXIT_FAILURE)
913

    
914

    
915
def ExecNoded(options, _):
916
  """Main node daemon function, executed with the PID file held.
917

    
918
  """
919
  if options.mlock:
920
    utils.Mlockall()
921
    request_executor_class = MlockallRequestExecutor
922
  else:
923
    request_executor_class = http.server.HttpServerRequestExecutor
924

    
925
  # Read SSL certificate
926
  if options.ssl:
927
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
928
                                    ssl_cert_path=options.ssl_cert)
929
  else:
930
    ssl_params = None
931

    
932
  err = _PrepareQueueLock()
933
  if err is not None:
934
    # this might be some kind of file-system/permission error; while
935
    # this breaks the job queue functionality, we shouldn't prevent
936
    # startup of the whole node daemon because of this
937
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
938

    
939
  mainloop = daemon.Mainloop()
940
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
941
                          ssl_params=ssl_params, ssl_verify_peer=True,
942
                          request_executor_class=request_executor_class)
943
  server.Start()
944
  try:
945
    mainloop.Run()
946
  finally:
947
    server.Stop()
948

    
949

    
950
def main():
951
  """Main function for the node daemon.
952

    
953
  """
954
  parser = OptionParser(description="Ganeti node daemon",
955
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
956
                        version="%%prog (ganeti) %s" %
957
                        constants.RELEASE_VERSION)
958
  parser.add_option("--no-mlock", dest="mlock",
959
                    help="Do not mlock the node memory in ram",
960
                    default=True, action="store_false")
961

    
962
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
963
  dirs.append((constants.LOG_OS_DIR, 0750))
964
  dirs.append((constants.LOCK_DIR, 1777))
965
  dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
966
  dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
967
  daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
968
                     default_ssl_cert=constants.NODED_CERT_FILE,
969
                     default_ssl_key=constants.NODED_CERT_FILE,
970
                     console_logging=True)
971

    
972

    
973
if __name__ == '__main__':
974
  main()