queue_lock = None
+def _RequireJobQueueLock(fn):
+ """Decorator for job queue manipulating functions.
+
+ """
+ def wrapper(*args, **kwargs):
+ # Locking in exclusive, blocking mode because there could be several
+ # children running at the same time. Waiting up to 10 seconds.
+ queue_lock.Exclusive(blocking=True, timeout=10)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ queue_lock.Unlock()
+ return wrapper
+
+
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
"""The server implementation.
"""
return backend.UploadFile(*params)
+ @staticmethod
+ def perspective_master_info(params):
+ """Query master information.
+
+ """
+ return backend.GetMasterInfo()
# os -----------------------
duration = params[0]
return utils.TestDelay(duration)
+ # file storage ---------------
+
@staticmethod
def perspective_file_storage_dir_create(params):
"""Create the file storage directory.
return backend.RenameFileStorageDir(old_file_storage_dir,
new_file_storage_dir)
+ # jobs ------------------------
+
@staticmethod
+ @_RequireJobQueueLock
def perspective_jobqueue_update(params):
"""Update job queue.
"""
(file_name, content) = params
-
- # Locking in exclusive, blocking mode because there could be several
- # children running at the same time.
- # TODO: Implement nonblocking locking with retries?
- queue_lock.Exclusive(blocking=True)
- try:
- return backend.JobQueueUpdate(file_name, content)
- finally:
- queue_lock.Unlock()
+ return backend.JobQueueUpdate(file_name, content)
@staticmethod
+ @_RequireJobQueueLock
def perspective_jobqueue_purge(params):
"""Purge job queue.
"""
return backend.JobQueuePurge()
+ @staticmethod
+ @_RequireJobQueueLock
+ def perspective_jobqueue_rename(params):
+ """Rename a job queue file.
+
+ """
+ (old, new) = params
+
+ return backend.JobQueueRename(old, new)
+
class NodeDaemonHttpServer(http.HTTPServer):
def __init__(self, server_address):