Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ b8d26c6e

History | View | Annotate | Download (26.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49
from ganeti import netutils
50

    
51
import ganeti.http.server # pylint: disable-msg=W0611
52

    
53

    
54
queue_lock = None
55

    
56

    
57
def _PrepareQueueLock():
58
  """Try to prepare the queue lock.
59

    
60
  @return: None for success, otherwise an exception object
61

    
62
  """
63
  global queue_lock # pylint: disable-msg=W0603
64

    
65
  if queue_lock is not None:
66
    return None
67

    
68
  # Prepare job queue
69
  try:
70
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71
    return None
72
  except EnvironmentError, err:
73
    return err
74

    
75

    
76
def _RequireJobQueueLock(fn):
77
  """Decorator for job queue manipulating functions.
78

    
79
  """
80
  QUEUE_LOCK_TIMEOUT = 10
81

    
82
  def wrapper(*args, **kwargs):
83
    # Locking in exclusive, blocking mode because there could be several
84
    # children running at the same time. Waiting up to 10 seconds.
85
    if _PrepareQueueLock() is not None:
86
      raise errors.JobQueueError("Job queue failed initialization,"
87
                                 " cannot update jobs")
88
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89
    try:
90
      return fn(*args, **kwargs)
91
    finally:
92
      queue_lock.Unlock()
93

    
94
  return wrapper
95

    
96

    
97
def _DecodeImportExportIO(ieio, ieioargs):
98
  """Decodes import/export I/O information.
99

    
100
  """
101
  if ieio == constants.IEIO_RAW_DISK:
102
    assert len(ieioargs) == 1
103
    return (objects.Disk.FromDict(ieioargs[0]), )
104

    
105
  if ieio == constants.IEIO_SCRIPT:
106
    assert len(ieioargs) == 2
107
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
108

    
109
  return ieioargs
110

    
111

    
112
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113
  """Custom Request Executor class that ensures NodeHttpServer children are
114
  locked in ram.
115

    
116
  """
117
  def __init__(self, *args, **kwargs):
118
    utils.Mlockall()
119

    
120
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121

    
122

    
123
class NodeHttpServer(http.server.HttpServer):
124
  """The server implementation.
125

    
126
  This class holds all methods exposed over the RPC interface.
127

    
128
  """
129
  # too many public methods, and unused args - all methods get params
130
  # due to the API
131
  # pylint: disable-msg=R0904,W0613
132
  def __init__(self, *args, **kwargs):
133
    http.server.HttpServer.__init__(self, *args, **kwargs)
134
    self.noded_pid = os.getpid()
135

    
136
  def HandleRequest(self, req):
137
    """Handle a request.
138

    
139
    """
140
    if req.request_method.upper() != http.HTTP_PUT:
141
      raise http.HttpBadRequest()
142

    
143
    path = req.request_path
144
    if path.startswith("/"):
145
      path = path[1:]
146

    
147
    method = getattr(self, "perspective_%s" % path, None)
148
    if method is None:
149
      raise http.HttpNotFound()
150

    
151
    try:
152
      result = (True, method(serializer.LoadJson(req.request_body)))
153

    
154
    except backend.RPCFail, err:
155
      # our custom failure exception; str(err) works fine if the
156
      # exception was constructed with a single argument, and in
157
      # this case, err.message == err.args[0] == str(err)
158
      result = (False, str(err))
159
    except errors.QuitGanetiException, err:
160
      # Tell parent to quit
161
      logging.info("Shutting down the node daemon, arguments: %s",
162
                   str(err.args))
163
      os.kill(self.noded_pid, signal.SIGTERM)
164
      # And return the error's arguments, which must be already in
165
      # correct tuple format
166
      result = err.args
167
    except Exception, err:
168
      logging.exception("Error in RPC call")
169
      result = (False, "Error while executing backend function: %s" % str(err))
170

    
171
    return serializer.DumpJson(result, indent=False)
172

    
173
  # the new block devices  --------------------------
174

    
175
  @staticmethod
176
  def perspective_blockdev_create(params):
177
    """Create a block device.
178

    
179
    """
180
    bdev_s, size, owner, on_primary, info = params
181
    bdev = objects.Disk.FromDict(bdev_s)
182
    if bdev is None:
183
      raise ValueError("can't unserialize data!")
184
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
185

    
186
  @staticmethod
187
  def perspective_blockdev_wipe(params):
188
    """Wipe a block device.
189

    
190
    """
191
    bdev_s, offset, size = params
192
    bdev = objects.Disk.FromDict(bdev_s)
193
    return backend.BlockdevWipe(bdev, offset, size)
194

    
195
  @staticmethod
196
  def perspective_blockdev_remove(params):
197
    """Remove a block device.
198

    
199
    """
200
    bdev_s = params[0]
201
    bdev = objects.Disk.FromDict(bdev_s)
202
    return backend.BlockdevRemove(bdev)
203

    
204
  @staticmethod
205
  def perspective_blockdev_rename(params):
206
    """Remove a block device.
207

    
208
    """
209
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
210
    return backend.BlockdevRename(devlist)
211

    
212
  @staticmethod
213
  def perspective_blockdev_assemble(params):
214
    """Assemble a block device.
215

    
216
    """
217
    bdev_s, owner, on_primary = params
218
    bdev = objects.Disk.FromDict(bdev_s)
219
    if bdev is None:
220
      raise ValueError("can't unserialize data!")
221
    return backend.BlockdevAssemble(bdev, owner, on_primary)
222

    
223
  @staticmethod
224
  def perspective_blockdev_shutdown(params):
225
    """Shutdown a block device.
226

    
227
    """
228
    bdev_s = params[0]
229
    bdev = objects.Disk.FromDict(bdev_s)
230
    if bdev is None:
231
      raise ValueError("can't unserialize data!")
232
    return backend.BlockdevShutdown(bdev)
233

    
234
  @staticmethod
235
  def perspective_blockdev_addchildren(params):
236
    """Add a child to a mirror device.
237

    
238
    Note: this is only valid for mirror devices. It's the caller's duty
239
    to send a correct disk, otherwise we raise an error.
240

    
241
    """
242
    bdev_s, ndev_s = params
243
    bdev = objects.Disk.FromDict(bdev_s)
244
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
245
    if bdev is None or ndevs.count(None) > 0:
246
      raise ValueError("can't unserialize data!")
247
    return backend.BlockdevAddchildren(bdev, ndevs)
248

    
249
  @staticmethod
250
  def perspective_blockdev_removechildren(params):
251
    """Remove a child from a mirror device.
252

    
253
    This is only valid for mirror devices, of course. It's the callers
254
    duty to send a correct disk, otherwise we raise an error.
255

    
256
    """
257
    bdev_s, ndev_s = params
258
    bdev = objects.Disk.FromDict(bdev_s)
259
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
260
    if bdev is None or ndevs.count(None) > 0:
261
      raise ValueError("can't unserialize data!")
262
    return backend.BlockdevRemovechildren(bdev, ndevs)
263

    
264
  @staticmethod
265
  def perspective_blockdev_getmirrorstatus(params):
266
    """Return the mirror status for a list of disks.
267

    
268
    """
269
    disks = [objects.Disk.FromDict(dsk_s)
270
             for dsk_s in params]
271
    return [status.ToDict()
272
            for status in backend.BlockdevGetmirrorstatus(disks)]
273

    
274
  @staticmethod
275
  def perspective_blockdev_getmirrorstatus_multi(params):
276
    """Return the mirror status for a list of disks.
277

    
278
    """
279
    (node_disks, ) = params
280

    
281
    node_name = netutils.Hostname.GetSysName()
282

    
283
    disks = [objects.Disk.FromDict(dsk_s)
284
             for dsk_s in node_disks.get(node_name, [])]
285

    
286
    return [status.ToDict()
287
            for status in backend.BlockdevGetmirrorstatus(disks)]
288

    
289
  @staticmethod
290
  def perspective_blockdev_find(params):
291
    """Expose the FindBlockDevice functionality for a disk.
292

    
293
    This will try to find but not activate a disk.
294

    
295
    """
296
    disk = objects.Disk.FromDict(params[0])
297

    
298
    result = backend.BlockdevFind(disk)
299
    if result is None:
300
      return None
301

    
302
    return result.ToDict()
303

    
304
  @staticmethod
305
  def perspective_blockdev_snapshot(params):
306
    """Create a snapshot device.
307

    
308
    Note that this is only valid for LVM disks, if we get passed
309
    something else we raise an exception. The snapshot device can be
310
    remove by calling the generic block device remove call.
311

    
312
    """
313
    cfbd = objects.Disk.FromDict(params[0])
314
    return backend.BlockdevSnapshot(cfbd)
315

    
316
  @staticmethod
317
  def perspective_blockdev_grow(params):
318
    """Grow a stack of devices.
319

    
320
    """
321
    cfbd = objects.Disk.FromDict(params[0])
322
    amount = params[1]
323
    return backend.BlockdevGrow(cfbd, amount)
324

    
325
  @staticmethod
326
  def perspective_blockdev_close(params):
327
    """Closes the given block devices.
328

    
329
    """
330
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
331
    return backend.BlockdevClose(params[0], disks)
332

    
333
  @staticmethod
334
  def perspective_blockdev_getsize(params):
335
    """Compute the sizes of the given block devices.
336

    
337
    """
338
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
339
    return backend.BlockdevGetsize(disks)
340

    
341
  @staticmethod
342
  def perspective_blockdev_export(params):
343
    """Compute the sizes of the given block devices.
344

    
345
    """
346
    disk = objects.Disk.FromDict(params[0])
347
    dest_node, dest_path, cluster_name = params[1:]
348
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
349

    
350
  # blockdev/drbd specific methods ----------
351

    
352
  @staticmethod
353
  def perspective_drbd_disconnect_net(params):
354
    """Disconnects the network connection of drbd disks.
355

    
356
    Note that this is only valid for drbd disks, so the members of the
357
    disk list must all be drbd devices.
358

    
359
    """
360
    nodes_ip, disks = params
361
    disks = [objects.Disk.FromDict(cf) for cf in disks]
362
    return backend.DrbdDisconnectNet(nodes_ip, disks)
363

    
364
  @staticmethod
365
  def perspective_drbd_attach_net(params):
366
    """Attaches the network connection of drbd disks.
367

    
368
    Note that this is only valid for drbd disks, so the members of the
369
    disk list must all be drbd devices.
370

    
371
    """
372
    nodes_ip, disks, instance_name, multimaster = params
373
    disks = [objects.Disk.FromDict(cf) for cf in disks]
374
    return backend.DrbdAttachNet(nodes_ip, disks,
375
                                     instance_name, multimaster)
376

    
377
  @staticmethod
378
  def perspective_drbd_wait_sync(params):
379
    """Wait until DRBD disks are synched.
380

    
381
    Note that this is only valid for drbd disks, so the members of the
382
    disk list must all be drbd devices.
383

    
384
    """
385
    nodes_ip, disks = params
386
    disks = [objects.Disk.FromDict(cf) for cf in disks]
387
    return backend.DrbdWaitSync(nodes_ip, disks)
388

    
389
  @staticmethod
390
  def perspective_drbd_helper(params):
391
    """Query drbd helper.
392

    
393
    """
394
    return backend.GetDrbdUsermodeHelper()
395

    
396
  # export/import  --------------------------
397

    
398
  @staticmethod
399
  def perspective_finalize_export(params):
400
    """Expose the finalize export functionality.
401

    
402
    """
403
    instance = objects.Instance.FromDict(params[0])
404

    
405
    snap_disks = []
406
    for disk in params[1]:
407
      if isinstance(disk, bool):
408
        snap_disks.append(disk)
409
      else:
410
        snap_disks.append(objects.Disk.FromDict(disk))
411

    
412
    return backend.FinalizeExport(instance, snap_disks)
413

    
414
  @staticmethod
415
  def perspective_export_info(params):
416
    """Query information about an existing export on this node.
417

    
418
    The given path may not contain an export, in which case we return
419
    None.
420

    
421
    """
422
    path = params[0]
423
    return backend.ExportInfo(path)
424

    
425
  @staticmethod
426
  def perspective_export_list(params):
427
    """List the available exports on this node.
428

    
429
    Note that as opposed to export_info, which may query data about an
430
    export in any path, this only queries the standard Ganeti path
431
    (constants.EXPORT_DIR).
432

    
433
    """
434
    return backend.ListExports()
435

    
436
  @staticmethod
437
  def perspective_export_remove(params):
438
    """Remove an export.
439

    
440
    """
441
    export = params[0]
442
    return backend.RemoveExport(export)
443

    
444
  # volume  --------------------------
445

    
446
  @staticmethod
447
  def perspective_lv_list(params):
448
    """Query the list of logical volumes in a given volume group.
449

    
450
    """
451
    vgname = params[0]
452
    return backend.GetVolumeList(vgname)
453

    
454
  @staticmethod
455
  def perspective_vg_list(params):
456
    """Query the list of volume groups.
457

    
458
    """
459
    return backend.ListVolumeGroups()
460

    
461
  # Storage --------------------------
462

    
463
  @staticmethod
464
  def perspective_storage_list(params):
465
    """Get list of storage units.
466

    
467
    """
468
    (su_name, su_args, name, fields) = params
469
    return storage.GetStorage(su_name, *su_args).List(name, fields)
470

    
471
  @staticmethod
472
  def perspective_storage_modify(params):
473
    """Modify a storage unit.
474

    
475
    """
476
    (su_name, su_args, name, changes) = params
477
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
478

    
479
  @staticmethod
480
  def perspective_storage_execute(params):
481
    """Execute an operation on a storage unit.
482

    
483
    """
484
    (su_name, su_args, name, op) = params
485
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
486

    
487
  # bridge  --------------------------
488

    
489
  @staticmethod
490
  def perspective_bridges_exist(params):
491
    """Check if all bridges given exist on this node.
492

    
493
    """
494
    bridges_list = params[0]
495
    return backend.BridgesExist(bridges_list)
496

    
497
  # instance  --------------------------
498

    
499
  @staticmethod
500
  def perspective_instance_os_add(params):
501
    """Install an OS on a given instance.
502

    
503
    """
504
    inst_s = params[0]
505
    inst = objects.Instance.FromDict(inst_s)
506
    reinstall = params[1]
507
    debug = params[2]
508
    return backend.InstanceOsAdd(inst, reinstall, debug)
509

    
510
  @staticmethod
511
  def perspective_instance_run_rename(params):
512
    """Runs the OS rename script for an instance.
513

    
514
    """
515
    inst_s, old_name, debug = params
516
    inst = objects.Instance.FromDict(inst_s)
517
    return backend.RunRenameInstance(inst, old_name, debug)
518

    
519
  @staticmethod
520
  def perspective_instance_shutdown(params):
521
    """Shutdown an instance.
522

    
523
    """
524
    instance = objects.Instance.FromDict(params[0])
525
    timeout = params[1]
526
    return backend.InstanceShutdown(instance, timeout)
527

    
528
  @staticmethod
529
  def perspective_instance_start(params):
530
    """Start an instance.
531

    
532
    """
533
    instance = objects.Instance.FromDict(params[0])
534
    return backend.StartInstance(instance)
535

    
536
  @staticmethod
537
  def perspective_migration_info(params):
538
    """Gather information about an instance to be migrated.
539

    
540
    """
541
    instance = objects.Instance.FromDict(params[0])
542
    return backend.MigrationInfo(instance)
543

    
544
  @staticmethod
545
  def perspective_accept_instance(params):
546
    """Prepare the node to accept an instance.
547

    
548
    """
549
    instance, info, target = params
550
    instance = objects.Instance.FromDict(instance)
551
    return backend.AcceptInstance(instance, info, target)
552

    
553
  @staticmethod
554
  def perspective_finalize_migration(params):
555
    """Finalize the instance migration.
556

    
557
    """
558
    instance, info, success = params
559
    instance = objects.Instance.FromDict(instance)
560
    return backend.FinalizeMigration(instance, info, success)
561

    
562
  @staticmethod
563
  def perspective_instance_migrate(params):
564
    """Migrates an instance.
565

    
566
    """
567
    instance, target, live = params
568
    instance = objects.Instance.FromDict(instance)
569
    return backend.MigrateInstance(instance, target, live)
570

    
571
  @staticmethod
572
  def perspective_instance_reboot(params):
573
    """Reboot an instance.
574

    
575
    """
576
    instance = objects.Instance.FromDict(params[0])
577
    reboot_type = params[1]
578
    shutdown_timeout = params[2]
579
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
580

    
581
  @staticmethod
582
  def perspective_instance_info(params):
583
    """Query instance information.
584

    
585
    """
586
    return backend.GetInstanceInfo(params[0], params[1])
587

    
588
  @staticmethod
589
  def perspective_instance_migratable(params):
590
    """Query whether the specified instance can be migrated.
591

    
592
    """
593
    instance = objects.Instance.FromDict(params[0])
594
    return backend.GetInstanceMigratable(instance)
595

    
596
  @staticmethod
597
  def perspective_all_instances_info(params):
598
    """Query information about all instances.
599

    
600
    """
601
    return backend.GetAllInstancesInfo(params[0])
602

    
603
  @staticmethod
604
  def perspective_instance_list(params):
605
    """Query the list of running instances.
606

    
607
    """
608
    return backend.GetInstanceList(params[0])
609

    
610
  # node --------------------------
611

    
612
  @staticmethod
613
  def perspective_node_tcp_ping(params):
614
    """Do a TcpPing on the remote node.
615

    
616
    """
617
    return netutils.TcpPing(params[1], params[2], timeout=params[3],
618
                            live_port_needed=params[4], source=params[0])
619

    
620
  @staticmethod
621
  def perspective_node_has_ip_address(params):
622
    """Checks if a node has the given ip address.
623

    
624
    """
625
    return netutils.IPAddress.Own(params[0])
626

    
627
  @staticmethod
628
  def perspective_node_info(params):
629
    """Query node information.
630

    
631
    """
632
    vgname, hypervisor_type = params
633
    return backend.GetNodeInfo(vgname, hypervisor_type)
634

    
635
  @staticmethod
636
  def perspective_etc_hosts_modify(params):
637
    """Modify a node entry in /etc/hosts.
638

    
639
    """
640
    backend.EtcHostsModify(params[0], params[1], params[2])
641

    
642
    return True
643

    
644
  @staticmethod
645
  def perspective_node_verify(params):
646
    """Run a verify sequence on this node.
647

    
648
    """
649
    return backend.VerifyNode(params[0], params[1])
650

    
651
  @staticmethod
652
  def perspective_node_start_master(params):
653
    """Promote this node to master status.
654

    
655
    """
656
    return backend.StartMaster(params[0], params[1])
657

    
658
  @staticmethod
659
  def perspective_node_stop_master(params):
660
    """Demote this node from master status.
661

    
662
    """
663
    return backend.StopMaster(params[0])
664

    
665
  @staticmethod
666
  def perspective_node_leave_cluster(params):
667
    """Cleanup after leaving a cluster.
668

    
669
    """
670
    return backend.LeaveCluster(params[0])
671

    
672
  @staticmethod
673
  def perspective_node_volumes(params):
674
    """Query the list of all logical volume groups.
675

    
676
    """
677
    return backend.NodeVolumes()
678

    
679
  @staticmethod
680
  def perspective_node_demote_from_mc(params):
681
    """Demote a node from the master candidate role.
682

    
683
    """
684
    return backend.DemoteFromMC()
685

    
686

    
687
  @staticmethod
688
  def perspective_node_powercycle(params):
689
    """Tries to powercycle the nod.
690

    
691
    """
692
    hypervisor_type = params[0]
693
    return backend.PowercycleNode(hypervisor_type)
694

    
695

    
696
  # cluster --------------------------
697

    
698
  @staticmethod
699
  def perspective_version(params):
700
    """Query version information.
701

    
702
    """
703
    return constants.PROTOCOL_VERSION
704

    
705
  @staticmethod
706
  def perspective_upload_file(params):
707
    """Upload a file.
708

    
709
    Note that the backend implementation imposes strict rules on which
710
    files are accepted.
711

    
712
    """
713
    return backend.UploadFile(*params)
714

    
715
  @staticmethod
716
  def perspective_master_info(params):
717
    """Query master information.
718

    
719
    """
720
    return backend.GetMasterInfo()
721

    
722
  @staticmethod
723
  def perspective_write_ssconf_files(params):
724
    """Write ssconf files.
725

    
726
    """
727
    (values,) = params
728
    return backend.WriteSsconfFiles(values)
729

    
730
  # os -----------------------
731

    
732
  @staticmethod
733
  def perspective_os_diagnose(params):
734
    """Query detailed information about existing OSes.
735

    
736
    """
737
    return backend.DiagnoseOS()
738

    
739
  @staticmethod
740
  def perspective_os_get(params):
741
    """Query information about a given OS.
742

    
743
    """
744
    name = params[0]
745
    os_obj = backend.OSFromDisk(name)
746
    return os_obj.ToDict()
747

    
748
  @staticmethod
749
  def perspective_os_validate(params):
750
    """Run a given OS' validation routine.
751

    
752
    """
753
    required, name, checks, params = params
754
    return backend.ValidateOS(required, name, checks, params)
755

    
756
  # hooks -----------------------
757

    
758
  @staticmethod
759
  def perspective_hooks_runner(params):
760
    """Run hook scripts.
761

    
762
    """
763
    hpath, phase, env = params
764
    hr = backend.HooksRunner()
765
    return hr.RunHooks(hpath, phase, env)
766

    
767
  # iallocator -----------------
768

    
769
  @staticmethod
770
  def perspective_iallocator_runner(params):
771
    """Run an iallocator script.
772

    
773
    """
774
    name, idata = params
775
    iar = backend.IAllocatorRunner()
776
    return iar.Run(name, idata)
777

    
778
  # test -----------------------
779

    
780
  @staticmethod
781
  def perspective_test_delay(params):
782
    """Run test delay.
783

    
784
    """
785
    duration = params[0]
786
    status, rval = utils.TestDelay(duration)
787
    if not status:
788
      raise backend.RPCFail(rval)
789
    return rval
790

    
791
  # file storage ---------------
792

    
793
  @staticmethod
794
  def perspective_file_storage_dir_create(params):
795
    """Create the file storage directory.
796

    
797
    """
798
    file_storage_dir = params[0]
799
    return backend.CreateFileStorageDir(file_storage_dir)
800

    
801
  @staticmethod
802
  def perspective_file_storage_dir_remove(params):
803
    """Remove the file storage directory.
804

    
805
    """
806
    file_storage_dir = params[0]
807
    return backend.RemoveFileStorageDir(file_storage_dir)
808

    
809
  @staticmethod
810
  def perspective_file_storage_dir_rename(params):
811
    """Rename the file storage directory.
812

    
813
    """
814
    old_file_storage_dir = params[0]
815
    new_file_storage_dir = params[1]
816
    return backend.RenameFileStorageDir(old_file_storage_dir,
817
                                        new_file_storage_dir)
818

    
819
  # jobs ------------------------
820

    
821
  @staticmethod
822
  @_RequireJobQueueLock
823
  def perspective_jobqueue_update(params):
824
    """Update job queue.
825

    
826
    """
827
    (file_name, content) = params
828
    return backend.JobQueueUpdate(file_name, content)
829

    
830
  @staticmethod
831
  @_RequireJobQueueLock
832
  def perspective_jobqueue_purge(params):
833
    """Purge job queue.
834

    
835
    """
836
    return backend.JobQueuePurge()
837

    
838
  @staticmethod
839
  @_RequireJobQueueLock
840
  def perspective_jobqueue_rename(params):
841
    """Rename a job queue file.
842

    
843
    """
844
    # TODO: What if a file fails to rename?
845
    return [backend.JobQueueRename(old, new) for old, new in params]
846

    
847
  # hypervisor ---------------
848

    
849
  @staticmethod
850
  def perspective_hypervisor_validate_params(params):
851
    """Validate the hypervisor parameters.
852

    
853
    """
854
    (hvname, hvparams) = params
855
    return backend.ValidateHVParams(hvname, hvparams)
856

    
857
  # Crypto
858

    
859
  @staticmethod
860
  def perspective_x509_cert_create(params):
861
    """Creates a new X509 certificate for SSL/TLS.
862

    
863
    """
864
    (validity, ) = params
865
    return backend.CreateX509Certificate(validity)
866

    
867
  @staticmethod
868
  def perspective_x509_cert_remove(params):
869
    """Removes a X509 certificate.
870

    
871
    """
872
    (name, ) = params
873
    return backend.RemoveX509Certificate(name)
874

    
875
  # Import and export
876

    
877
  @staticmethod
878
  def perspective_import_start(params):
879
    """Starts an import daemon.
880

    
881
    """
882
    (opts_s, instance, dest, dest_args) = params
883

    
884
    opts = objects.ImportExportOptions.FromDict(opts_s)
885

    
886
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
887
                                           None, None,
888
                                           objects.Instance.FromDict(instance),
889
                                           dest,
890
                                           _DecodeImportExportIO(dest,
891
                                                                 dest_args))
892

    
893
  @staticmethod
894
  def perspective_export_start(params):
895
    """Starts an export daemon.
896

    
897
    """
898
    (opts_s, host, port, instance, source, source_args) = params
899

    
900
    opts = objects.ImportExportOptions.FromDict(opts_s)
901

    
902
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
903
                                           host, port,
904
                                           objects.Instance.FromDict(instance),
905
                                           source,
906
                                           _DecodeImportExportIO(source,
907
                                                                 source_args))
908

    
909
  @staticmethod
910
  def perspective_impexp_status(params):
911
    """Retrieves the status of an import or export daemon.
912

    
913
    """
914
    return backend.GetImportExportStatus(params[0])
915

    
916
  @staticmethod
917
  def perspective_impexp_abort(params):
918
    """Aborts an import or export.
919

    
920
    """
921
    return backend.AbortImportExport(params[0])
922

    
923
  @staticmethod
924
  def perspective_impexp_cleanup(params):
925
    """Cleans up after an import or export.
926

    
927
    """
928
    return backend.CleanupImportExport(params[0])
929

    
930

    
931
def CheckNoded(_, args):
932
  """Initial checks whether to run or exit with a failure.
933

    
934
  """
935
  if args: # noded doesn't take any arguments
936
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
937
                          sys.argv[0])
938
    sys.exit(constants.EXIT_FAILURE)
939

    
940

    
941
def PrepNoded(options, _):
942
  """Preparation node daemon function, executed with the PID file held.
943

    
944
  """
945
  if options.mlock:
946
    request_executor_class = MlockallRequestExecutor
947
    try:
948
      utils.Mlockall()
949
    except errors.NoCtypesError:
950
      logging.warning("Cannot set memory lock, ctypes module not found")
951
      request_executor_class = http.server.HttpServerRequestExecutor
952
  else:
953
    request_executor_class = http.server.HttpServerRequestExecutor
954

    
955
  # Read SSL certificate
956
  if options.ssl:
957
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
958
                                    ssl_cert_path=options.ssl_cert)
959
  else:
960
    ssl_params = None
961

    
962
  err = _PrepareQueueLock()
963
  if err is not None:
964
    # this might be some kind of file-system/permission error; while
965
    # this breaks the job queue functionality, we shouldn't prevent
966
    # startup of the whole node daemon because of this
967
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
968

    
969
  mainloop = daemon.Mainloop()
970
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
971
                          ssl_params=ssl_params, ssl_verify_peer=True,
972
                          request_executor_class=request_executor_class)
973
  server.Start()
974
  return (mainloop, server)
975

    
976
def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
977
  """Main node daemon function, executed with the PID file held.
978

    
979
  """
980
  (mainloop, server) = prep_data
981
  try:
982
    mainloop.Run()
983
  finally:
984
    server.Stop()
985

    
986

    
987
def main():
988
  """Main function for the node daemon.
989

    
990
  """
991
  parser = OptionParser(description="Ganeti node daemon",
992
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
993
                        version="%%prog (ganeti) %s" %
994
                        constants.RELEASE_VERSION)
995
  parser.add_option("--no-mlock", dest="mlock",
996
                    help="Do not mlock the node memory in ram",
997
                    default=True, action="store_false")
998

    
999
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1000
                     default_ssl_cert=constants.NODED_CERT_FILE,
1001
                     default_ssl_key=constants.NODED_CERT_FILE,
1002
                     console_logging=True)
1003

    
1004

    
1005
if __name__ == '__main__':
1006
  main()