Disallow DES for SSL connections
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81   @ivar stop_timestamp: timestamp for the end of the execution
82
83   """
84   __slots__ = ["input", "status", "result", "log",
85                "start_timestamp", "exec_timestamp", "end_timestamp",
86                "__weakref__"]
87
88   def __init__(self, op):
89     """Constructor for the _QuededOpCode.
90
91     @type op: L{opcodes.OpCode}
92     @param op: the opcode we encapsulate
93
94     """
95     self.input = op
96     self.status = constants.OP_STATUS_QUEUED
97     self.result = None
98     self.log = []
99     self.start_timestamp = None
100     self.exec_timestamp = None
101     self.end_timestamp = None
102
103   @classmethod
104   def Restore(cls, state):
105     """Restore the _QueuedOpCode from the serialized form.
106
107     @type state: dict
108     @param state: the serialized state
109     @rtype: _QueuedOpCode
110     @return: a new _QueuedOpCode instance
111
112     """
113     obj = _QueuedOpCode.__new__(cls)
114     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115     obj.status = state["status"]
116     obj.result = state["result"]
117     obj.log = state["log"]
118     obj.start_timestamp = state.get("start_timestamp", None)
119     obj.exec_timestamp = state.get("exec_timestamp", None)
120     obj.end_timestamp = state.get("end_timestamp", None)
121     return obj
122
123   def Serialize(self):
124     """Serializes this _QueuedOpCode.
125
126     @rtype: dict
127     @return: the dictionary holding the serialized state
128
129     """
130     return {
131       "input": self.input.__getstate__(),
132       "status": self.status,
133       "result": self.result,
134       "log": self.log,
135       "start_timestamp": self.start_timestamp,
136       "exec_timestamp": self.exec_timestamp,
137       "end_timestamp": self.end_timestamp,
138       }
139
140
141 class _QueuedJob(object):
142   """In-memory job representation.
143
144   This is what we use to track the user-submitted jobs. Locking must
145   be taken care of by users of this class.
146
147   @type queue: L{JobQueue}
148   @ivar queue: the parent queue
149   @ivar id: the job ID
150   @type ops: list
151   @ivar ops: the list of _QueuedOpCode that constitute the job
152   @type log_serial: int
153   @ivar log_serial: holds the index for the next log entry
154   @ivar received_timestamp: the timestamp for when the job was received
155   @ivar start_timestmap: the timestamp for start of execution
156   @ivar end_timestamp: the timestamp for end of execution
157   @ivar lock_status: In-memory locking information for debugging
158   @ivar change: a Condition variable we use for waiting for job changes
159
160   """
161   # pylint: disable-msg=W0212
162   __slots__ = ["queue", "id", "ops", "log_serial",
163                "received_timestamp", "start_timestamp", "end_timestamp",
164                "lock_status", "change",
165                "__weakref__"]
166
167   def __init__(self, queue, job_id, ops):
168     """Constructor for the _QueuedJob.
169
170     @type queue: L{JobQueue}
171     @param queue: our parent queue
172     @type job_id: job_id
173     @param job_id: our job id
174     @type ops: list
175     @param ops: the list of opcodes we hold, which will be encapsulated
176         in _QueuedOpCodes
177
178     """
179     if not ops:
180       raise errors.GenericError("A job needs at least one opcode")
181
182     self.queue = queue
183     self.id = job_id
184     self.ops = [_QueuedOpCode(op) for op in ops]
185     self.log_serial = 0
186     self.received_timestamp = TimeStampNow()
187     self.start_timestamp = None
188     self.end_timestamp = None
189
190     # In-memory attributes
191     self.lock_status = None
192
193     # Condition to wait for changes
194     self.change = threading.Condition(self.queue._lock)
195
196   def __repr__(self):
197     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
198               "id=%s" % self.id,
199               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
200
201     return "<%s at %#x>" % (" ".join(status), id(self))
202
203   @classmethod
204   def Restore(cls, queue, state):
205     """Restore a _QueuedJob from serialized state:
206
207     @type queue: L{JobQueue}
208     @param queue: to which queue the restored job belongs
209     @type state: dict
210     @param state: the serialized state
211     @rtype: _JobQueue
212     @return: the restored _JobQueue instance
213
214     """
215     obj = _QueuedJob.__new__(cls)
216     obj.queue = queue
217     obj.id = state["id"]
218     obj.received_timestamp = state.get("received_timestamp", None)
219     obj.start_timestamp = state.get("start_timestamp", None)
220     obj.end_timestamp = state.get("end_timestamp", None)
221
222     # In-memory attributes
223     obj.lock_status = None
224
225     obj.ops = []
226     obj.log_serial = 0
227     for op_state in state["ops"]:
228       op = _QueuedOpCode.Restore(op_state)
229       for log_entry in op.log:
230         obj.log_serial = max(obj.log_serial, log_entry[0])
231       obj.ops.append(op)
232
233     # Condition to wait for changes
234     obj.change = threading.Condition(obj.queue._lock)
235
236     return obj
237
238   def Serialize(self):
239     """Serialize the _JobQueue instance.
240
241     @rtype: dict
242     @return: the serialized state
243
244     """
245     return {
246       "id": self.id,
247       "ops": [op.Serialize() for op in self.ops],
248       "start_timestamp": self.start_timestamp,
249       "end_timestamp": self.end_timestamp,
250       "received_timestamp": self.received_timestamp,
251       }
252
253   def CalcStatus(self):
254     """Compute the status of this job.
255
256     This function iterates over all the _QueuedOpCodes in the job and
257     based on their status, computes the job status.
258
259     The algorithm is:
260       - if we find a cancelled, or finished with error, the job
261         status will be the same
262       - otherwise, the last opcode with the status one of:
263           - waitlock
264           - canceling
265           - running
266
267         will determine the job status
268
269       - otherwise, it means either all opcodes are queued, or success,
270         and the job status will be the same
271
272     @return: the job status
273
274     """
275     status = constants.JOB_STATUS_QUEUED
276
277     all_success = True
278     for op in self.ops:
279       if op.status == constants.OP_STATUS_SUCCESS:
280         continue
281
282       all_success = False
283
284       if op.status == constants.OP_STATUS_QUEUED:
285         pass
286       elif op.status == constants.OP_STATUS_WAITLOCK:
287         status = constants.JOB_STATUS_WAITLOCK
288       elif op.status == constants.OP_STATUS_RUNNING:
289         status = constants.JOB_STATUS_RUNNING
290       elif op.status == constants.OP_STATUS_CANCELING:
291         status = constants.JOB_STATUS_CANCELING
292         break
293       elif op.status == constants.OP_STATUS_ERROR:
294         status = constants.JOB_STATUS_ERROR
295         # The whole job fails if one opcode failed
296         break
297       elif op.status == constants.OP_STATUS_CANCELED:
298         status = constants.OP_STATUS_CANCELED
299         break
300
301     if all_success:
302       status = constants.JOB_STATUS_SUCCESS
303
304     return status
305
306   def GetLogEntries(self, newer_than):
307     """Selectively returns the log entries.
308
309     @type newer_than: None or int
310     @param newer_than: if this is None, return all log entries,
311         otherwise return only the log entries with serial higher
312         than this value
313     @rtype: list
314     @return: the list of the log entries selected
315
316     """
317     if newer_than is None:
318       serial = -1
319     else:
320       serial = newer_than
321
322     entries = []
323     for op in self.ops:
324       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325
326     return entries
327
328   def MarkUnfinishedOps(self, status, result):
329     """Mark unfinished opcodes with a given status and result.
330
331     This is an utility function for marking all running or waiting to
332     be run opcodes with a given status. Opcodes which are already
333     finalised are not changed.
334
335     @param status: a given opcode status
336     @param result: the opcode result
337
338     """
339     not_marked = True
340     for op in self.ops:
341       if op.status in constants.OPS_FINALIZED:
342         assert not_marked, "Finalized opcodes found after non-finalized ones"
343         continue
344       op.status = status
345       op.result = result
346       not_marked = False
347
348
349 class _OpExecCallbacks(mcpu.OpExecCbBase):
350   def __init__(self, queue, job, op):
351     """Initializes this class.
352
353     @type queue: L{JobQueue}
354     @param queue: Job queue
355     @type job: L{_QueuedJob}
356     @param job: Job object
357     @type op: L{_QueuedOpCode}
358     @param op: OpCode
359
360     """
361     assert queue, "Queue is missing"
362     assert job, "Job is missing"
363     assert op, "Opcode is missing"
364
365     self._queue = queue
366     self._job = job
367     self._op = op
368
369   def NotifyStart(self):
370     """Mark the opcode as running, not lock-waiting.
371
372     This is called from the mcpu code as a notifier function, when the LU is
373     finally about to start the Exec() method. Of course, to have end-user
374     visible results, the opcode must be initially (before calling into
375     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
376
377     """
378     self._queue.acquire()
379     try:
380       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
381                                  constants.OP_STATUS_CANCELING)
382
383       # All locks are acquired by now
384       self._job.lock_status = None
385
386       # Cancel here if we were asked to
387       if self._op.status == constants.OP_STATUS_CANCELING:
388         raise CancelJob()
389
390       self._op.status = constants.OP_STATUS_RUNNING
391       self._op.exec_timestamp = TimeStampNow()
392     finally:
393       self._queue.release()
394
395   def Feedback(self, *args):
396     """Append a log entry.
397
398     """
399     assert len(args) < 3
400
401     if len(args) == 1:
402       log_type = constants.ELOG_MESSAGE
403       log_msg = args[0]
404     else:
405       (log_type, log_msg) = args
406
407     # The time is split to make serialization easier and not lose
408     # precision.
409     timestamp = utils.SplitTime(time.time())
410
411     self._queue.acquire()
412     try:
413       self._job.log_serial += 1
414       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
415
416       self._job.change.notifyAll()
417     finally:
418       self._queue.release()
419
420   def ReportLocks(self, msg):
421     """Write locking information to the job.
422
423     Called whenever the LU processor is waiting for a lock or has acquired one.
424
425     """
426     # Not getting the queue lock because this is a single assignment
427     self._job.lock_status = msg
428
429
430 class _JobQueueWorker(workerpool.BaseWorker):
431   """The actual job workers.
432
433   """
434   def RunTask(self, job): # pylint: disable-msg=W0221
435     """Job executor.
436
437     This functions processes a job. It is closely tied to the _QueuedJob and
438     _QueuedOpCode classes.
439
440     @type job: L{_QueuedJob}
441     @param job: the job to be processed
442
443     """
444     logging.info("Processing job %s", job.id)
445     proc = mcpu.Processor(self.pool.queue.context, job.id)
446     queue = job.queue
447     try:
448       try:
449         count = len(job.ops)
450         for idx, op in enumerate(job.ops):
451           op_summary = op.input.Summary()
452           if op.status == constants.OP_STATUS_SUCCESS:
453             # this is a job that was partially completed before master
454             # daemon shutdown, so it can be expected that some opcodes
455             # are already completed successfully (if any did error
456             # out, then the whole job should have been aborted and not
457             # resubmitted for processing)
458             logging.info("Op %s/%s: opcode %s already processed, skipping",
459                          idx + 1, count, op_summary)
460             continue
461           try:
462             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
463                          op_summary)
464
465             queue.acquire()
466             try:
467               if op.status == constants.OP_STATUS_CANCELED:
468                 raise CancelJob()
469               assert op.status == constants.OP_STATUS_QUEUED
470               op.status = constants.OP_STATUS_WAITLOCK
471               op.result = None
472               op.start_timestamp = TimeStampNow()
473               if idx == 0: # first opcode
474                 job.start_timestamp = op.start_timestamp
475               queue.UpdateJobUnlocked(job)
476
477               input_opcode = op.input
478             finally:
479               queue.release()
480
481             # Make sure not to hold queue lock while calling ExecOpCode
482             result = proc.ExecOpCode(input_opcode,
483                                      _OpExecCallbacks(queue, job, op))
484
485             queue.acquire()
486             try:
487               op.status = constants.OP_STATUS_SUCCESS
488               op.result = result
489               op.end_timestamp = TimeStampNow()
490               queue.UpdateJobUnlocked(job)
491             finally:
492               queue.release()
493
494             logging.info("Op %s/%s: Successfully finished opcode %s",
495                          idx + 1, count, op_summary)
496           except CancelJob:
497             # Will be handled further up
498             raise
499           except Exception, err:
500             queue.acquire()
501             try:
502               try:
503                 op.status = constants.OP_STATUS_ERROR
504                 if isinstance(err, errors.GenericError):
505                   op.result = errors.EncodeException(err)
506                 else:
507                   op.result = str(err)
508                 op.end_timestamp = TimeStampNow()
509                 logging.info("Op %s/%s: Error in opcode %s: %s",
510                              idx + 1, count, op_summary, err)
511               finally:
512                 queue.UpdateJobUnlocked(job)
513             finally:
514               queue.release()
515             raise
516
517       except CancelJob:
518         queue.acquire()
519         try:
520           queue.CancelJobUnlocked(job)
521         finally:
522           queue.release()
523       except errors.GenericError, err:
524         logging.exception("Ganeti exception")
525       except:
526         logging.exception("Unhandled exception")
527     finally:
528       queue.acquire()
529       try:
530         try:
531           job.lock_status = None
532           job.end_timestamp = TimeStampNow()
533           queue.UpdateJobUnlocked(job)
534         finally:
535           job_id = job.id
536           status = job.CalcStatus()
537       finally:
538         queue.release()
539
540       logging.info("Finished job %s, status = %s", job_id, status)
541
542
543 class _JobQueueWorkerPool(workerpool.WorkerPool):
544   """Simple class implementing a job-processing workerpool.
545
546   """
547   def __init__(self, queue):
548     super(_JobQueueWorkerPool, self).__init__("JobQueue",
549                                               JOBQUEUE_THREADS,
550                                               _JobQueueWorker)
551     self.queue = queue
552
553
554 def _RequireOpenQueue(fn):
555   """Decorator for "public" functions.
556
557   This function should be used for all 'public' functions. That is,
558   functions usually called from other classes. Note that this should
559   be applied only to methods (not plain functions), since it expects
560   that the decorated function is called with a first argument that has
561   a '_queue_filelock' argument.
562
563   @warning: Use this decorator only after utils.LockedMethod!
564
565   Example::
566     @utils.LockedMethod
567     @_RequireOpenQueue
568     def Example(self):
569       pass
570
571   """
572   def wrapper(self, *args, **kwargs):
573     # pylint: disable-msg=W0212
574     assert self._queue_filelock is not None, "Queue should be open"
575     return fn(self, *args, **kwargs)
576   return wrapper
577
578
579 class JobQueue(object):
580   """Queue used to manage the jobs.
581
582   @cvar _RE_JOB_FILE: regex matching the valid job file names
583
584   """
585   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
586
587   def __init__(self, context):
588     """Constructor for JobQueue.
589
590     The constructor will initialize the job queue object and then
591     start loading the current jobs from disk, either for starting them
592     (if they were queue) or for aborting them (if they were already
593     running).
594
595     @type context: GanetiContext
596     @param context: the context object for access to the configuration
597         data and other ganeti objects
598
599     """
600     self.context = context
601     self._memcache = weakref.WeakValueDictionary()
602     self._my_hostname = utils.HostInfo().name
603
604     # Locking
605     self._lock = threading.Lock()
606     self.acquire = self._lock.acquire
607     self.release = self._lock.release
608
609     # Initialize the queue, and acquire the filelock.
610     # This ensures no other process is working on the job queue.
611     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
612
613     # Read serial file
614     self._last_serial = jstore.ReadSerial()
615     assert self._last_serial is not None, ("Serial file was modified between"
616                                            " check in jstore and here")
617
618     # Get initial list of nodes
619     self._nodes = dict((n.name, n.primary_ip)
620                        for n in self.context.cfg.GetAllNodesInfo().values()
621                        if n.master_candidate)
622
623     # Remove master node
624     try:
625       del self._nodes[self._my_hostname]
626     except KeyError:
627       pass
628
629     # TODO: Check consistency across nodes
630
631     self._queue_size = 0
632     self._UpdateQueueSizeUnlocked()
633     self._drained = self._IsQueueMarkedDrain()
634
635     # Setup worker pool
636     self._wpool = _JobQueueWorkerPool(self)
637     try:
638       # We need to lock here because WorkerPool.AddTask() may start a job while
639       # we're still doing our work.
640       self.acquire()
641       try:
642         logging.info("Inspecting job queue")
643
644         all_job_ids = self._GetJobIDsUnlocked()
645         jobs_count = len(all_job_ids)
646         lastinfo = time.time()
647         for idx, job_id in enumerate(all_job_ids):
648           # Give an update every 1000 jobs or 10 seconds
649           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
650               idx == (jobs_count - 1)):
651             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
652                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
653             lastinfo = time.time()
654
655           job = self._LoadJobUnlocked(job_id)
656
657           # a failure in loading the job can cause 'None' to be returned
658           if job is None:
659             continue
660
661           status = job.CalcStatus()
662
663           if status in (constants.JOB_STATUS_QUEUED, ):
664             self._wpool.AddTask(job)
665
666           elif status in (constants.JOB_STATUS_RUNNING,
667                           constants.JOB_STATUS_WAITLOCK,
668                           constants.JOB_STATUS_CANCELING):
669             logging.warning("Unfinished job %s found: %s", job.id, job)
670             try:
671               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
672                                     "Unclean master daemon shutdown")
673             finally:
674               self.UpdateJobUnlocked(job)
675
676         logging.info("Job queue inspection finished")
677       finally:
678         self.release()
679     except:
680       self._wpool.TerminateWorkers()
681       raise
682
683   @utils.LockedMethod
684   @_RequireOpenQueue
685   def AddNode(self, node):
686     """Register a new node with the queue.
687
688     @type node: L{objects.Node}
689     @param node: the node object to be added
690
691     """
692     node_name = node.name
693     assert node_name != self._my_hostname
694
695     # Clean queue directory on added node
696     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
697     msg = result.fail_msg
698     if msg:
699       logging.warning("Cannot cleanup queue directory on node %s: %s",
700                       node_name, msg)
701
702     if not node.master_candidate:
703       # remove if existing, ignoring errors
704       self._nodes.pop(node_name, None)
705       # and skip the replication of the job ids
706       return
707
708     # Upload the whole queue excluding archived jobs
709     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
710
711     # Upload current serial file
712     files.append(constants.JOB_QUEUE_SERIAL_FILE)
713
714     for file_name in files:
715       # Read file content
716       content = utils.ReadFile(file_name)
717
718       result = rpc.RpcRunner.call_jobqueue_update([node_name],
719                                                   [node.primary_ip],
720                                                   file_name, content)
721       msg = result[node_name].fail_msg
722       if msg:
723         logging.error("Failed to upload file %s to node %s: %s",
724                       file_name, node_name, msg)
725
726     self._nodes[node_name] = node.primary_ip
727
728   @utils.LockedMethod
729   @_RequireOpenQueue
730   def RemoveNode(self, node_name):
731     """Callback called when removing nodes from the cluster.
732
733     @type node_name: str
734     @param node_name: the name of the node to remove
735
736     """
737     try:
738       # The queue is removed by the "leave node" RPC call.
739       del self._nodes[node_name]
740     except KeyError:
741       pass
742
743   @staticmethod
744   def _CheckRpcResult(result, nodes, failmsg):
745     """Verifies the status of an RPC call.
746
747     Since we aim to keep consistency should this node (the current
748     master) fail, we will log errors if our rpc fail, and especially
749     log the case when more than half of the nodes fails.
750
751     @param result: the data as returned from the rpc call
752     @type nodes: list
753     @param nodes: the list of nodes we made the call to
754     @type failmsg: str
755     @param failmsg: the identifier to be used for logging
756
757     """
758     failed = []
759     success = []
760
761     for node in nodes:
762       msg = result[node].fail_msg
763       if msg:
764         failed.append(node)
765         logging.error("RPC call %s (%s) failed on node %s: %s",
766                       result[node].call, failmsg, node, msg)
767       else:
768         success.append(node)
769
770     # +1 for the master node
771     if (len(success) + 1) < len(failed):
772       # TODO: Handle failing nodes
773       logging.error("More than half of the nodes failed")
774
775   def _GetNodeIp(self):
776     """Helper for returning the node name/ip list.
777
778     @rtype: (list, list)
779     @return: a tuple of two lists, the first one with the node
780         names and the second one with the node addresses
781
782     """
783     name_list = self._nodes.keys()
784     addr_list = [self._nodes[name] for name in name_list]
785     return name_list, addr_list
786
787   def _WriteAndReplicateFileUnlocked(self, file_name, data):
788     """Writes a file locally and then replicates it to all nodes.
789
790     This function will replace the contents of a file on the local
791     node and then replicate it to all the other nodes we have.
792
793     @type file_name: str
794     @param file_name: the path of the file to be replicated
795     @type data: str
796     @param data: the new contents of the file
797
798     """
799     utils.WriteFile(file_name, data=data)
800
801     names, addrs = self._GetNodeIp()
802     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
803     self._CheckRpcResult(result, self._nodes,
804                          "Updating %s" % file_name)
805
806   def _RenameFilesUnlocked(self, rename):
807     """Renames a file locally and then replicate the change.
808
809     This function will rename a file in the local queue directory
810     and then replicate this rename to all the other nodes we have.
811
812     @type rename: list of (old, new)
813     @param rename: List containing tuples mapping old to new names
814
815     """
816     # Rename them locally
817     for old, new in rename:
818       utils.RenameFile(old, new, mkdir=True)
819
820     # ... and on all nodes
821     names, addrs = self._GetNodeIp()
822     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
823     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
824
825   @staticmethod
826   def _FormatJobID(job_id):
827     """Convert a job ID to string format.
828
829     Currently this just does C{str(job_id)} after performing some
830     checks, but if we want to change the job id format this will
831     abstract this change.
832
833     @type job_id: int or long
834     @param job_id: the numeric job id
835     @rtype: str
836     @return: the formatted job id
837
838     """
839     if not isinstance(job_id, (int, long)):
840       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
841     if job_id < 0:
842       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
843
844     return str(job_id)
845
846   @classmethod
847   def _GetArchiveDirectory(cls, job_id):
848     """Returns the archive directory for a job.
849
850     @type job_id: str
851     @param job_id: Job identifier
852     @rtype: str
853     @return: Directory name
854
855     """
856     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
857
858   def _NewSerialsUnlocked(self, count):
859     """Generates a new job identifier.
860
861     Job identifiers are unique during the lifetime of a cluster.
862
863     @type count: integer
864     @param count: how many serials to return
865     @rtype: str
866     @return: a string representing the job identifier.
867
868     """
869     assert count > 0
870     # New number
871     serial = self._last_serial + count
872
873     # Write to file
874     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
875                                         "%s\n" % serial)
876
877     result = [self._FormatJobID(v)
878               for v in range(self._last_serial, serial + 1)]
879     # Keep it only if we were able to write the file
880     self._last_serial = serial
881
882     return result
883
884   @staticmethod
885   def _GetJobPath(job_id):
886     """Returns the job file for a given job id.
887
888     @type job_id: str
889     @param job_id: the job identifier
890     @rtype: str
891     @return: the path to the job file
892
893     """
894     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
895
896   @classmethod
897   def _GetArchivedJobPath(cls, job_id):
898     """Returns the archived job file for a give job id.
899
900     @type job_id: str
901     @param job_id: the job identifier
902     @rtype: str
903     @return: the path to the archived job file
904
905     """
906     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
907                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
908
909   def _GetJobIDsUnlocked(self, sort=True):
910     """Return all known job IDs.
911
912     The method only looks at disk because it's a requirement that all
913     jobs are present on disk (so in the _memcache we don't have any
914     extra IDs).
915
916     @type sort: boolean
917     @param sort: perform sorting on the returned job ids
918     @rtype: list
919     @return: the list of job IDs
920
921     """
922     jlist = []
923     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR, sort=False):
924       m = self._RE_JOB_FILE.match(filename)
925       if m:
926         jlist.append(m.group(1))
927     if sort:
928       jlist = utils.NiceSort(jlist)
929     return jlist
930
931   def _LoadJobUnlocked(self, job_id):
932     """Loads a job from the disk or memory.
933
934     Given a job id, this will return the cached job object if
935     existing, or try to load the job from the disk. If loading from
936     disk, it will also add the job to the cache.
937
938     @param job_id: the job id
939     @rtype: L{_QueuedJob} or None
940     @return: either None or the job object
941
942     """
943     job = self._memcache.get(job_id, None)
944     if job:
945       logging.debug("Found job %s in memcache", job_id)
946       return job
947
948     filepath = self._GetJobPath(job_id)
949     logging.debug("Loading job from %s", filepath)
950     try:
951       raw_data = utils.ReadFile(filepath)
952     except IOError, err:
953       if err.errno in (errno.ENOENT, ):
954         return None
955       raise
956
957     data = serializer.LoadJson(raw_data)
958
959     try:
960       job = _QueuedJob.Restore(self, data)
961     except Exception, err: # pylint: disable-msg=W0703
962       new_path = self._GetArchivedJobPath(job_id)
963       if filepath == new_path:
964         # job already archived (future case)
965         logging.exception("Can't parse job %s", job_id)
966       else:
967         # non-archived case
968         logging.exception("Can't parse job %s, will archive.", job_id)
969         self._RenameFilesUnlocked([(filepath, new_path)])
970       return None
971
972     self._memcache[job_id] = job
973     logging.debug("Added job %s to the cache", job_id)
974     return job
975
976   def _GetJobsUnlocked(self, job_ids):
977     """Return a list of jobs based on their IDs.
978
979     @type job_ids: list
980     @param job_ids: either an empty list (meaning all jobs),
981         or a list of job IDs
982     @rtype: list
983     @return: the list of job objects
984
985     """
986     if not job_ids:
987       job_ids = self._GetJobIDsUnlocked()
988
989     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
990
991   @staticmethod
992   def _IsQueueMarkedDrain():
993     """Check if the queue is marked from drain.
994
995     This currently uses the queue drain file, which makes it a
996     per-node flag. In the future this can be moved to the config file.
997
998     @rtype: boolean
999     @return: True of the job queue is marked for draining
1000
1001     """
1002     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1003
1004   def _UpdateQueueSizeUnlocked(self):
1005     """Update the queue size.
1006
1007     """
1008     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1009
1010   @utils.LockedMethod
1011   @_RequireOpenQueue
1012   def SetDrainFlag(self, drain_flag):
1013     """Sets the drain flag for the queue.
1014
1015     @type drain_flag: boolean
1016     @param drain_flag: Whether to set or unset the drain flag
1017
1018     """
1019     if drain_flag:
1020       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1021     else:
1022       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1023
1024     self._drained = drain_flag
1025
1026     return True
1027
1028   @_RequireOpenQueue
1029   def _SubmitJobUnlocked(self, job_id, ops):
1030     """Create and store a new job.
1031
1032     This enters the job into our job queue and also puts it on the new
1033     queue, in order for it to be picked up by the queue processors.
1034
1035     @type job_id: job ID
1036     @param job_id: the job ID for the new job
1037     @type ops: list
1038     @param ops: The list of OpCodes that will become the new job.
1039     @rtype: job ID
1040     @return: the job ID of the newly created job
1041     @raise errors.JobQueueDrainError: if the job is marked for draining
1042
1043     """
1044     # Ok when sharing the big job queue lock, as the drain file is created when
1045     # the lock is exclusive.
1046     if self._drained:
1047       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1048
1049     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1050       raise errors.JobQueueFull()
1051
1052     job = _QueuedJob(self, job_id, ops)
1053
1054     # Write to disk
1055     self.UpdateJobUnlocked(job)
1056
1057     self._queue_size += 1
1058
1059     logging.debug("Adding new job %s to the cache", job_id)
1060     self._memcache[job_id] = job
1061
1062     # Add to worker pool
1063     self._wpool.AddTask(job)
1064
1065     return job.id
1066
1067   @utils.LockedMethod
1068   @_RequireOpenQueue
1069   def SubmitJob(self, ops):
1070     """Create and store a new job.
1071
1072     @see: L{_SubmitJobUnlocked}
1073
1074     """
1075     job_id = self._NewSerialsUnlocked(1)[0]
1076     return self._SubmitJobUnlocked(job_id, ops)
1077
1078   @utils.LockedMethod
1079   @_RequireOpenQueue
1080   def SubmitManyJobs(self, jobs):
1081     """Create and store multiple jobs.
1082
1083     @see: L{_SubmitJobUnlocked}
1084
1085     """
1086     results = []
1087     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1088     for job_id, ops in zip(all_job_ids, jobs):
1089       try:
1090         data = self._SubmitJobUnlocked(job_id, ops)
1091         status = True
1092       except errors.GenericError, err:
1093         data = str(err)
1094         status = False
1095       results.append((status, data))
1096
1097     return results
1098
1099   @_RequireOpenQueue
1100   def UpdateJobUnlocked(self, job):
1101     """Update a job's on disk storage.
1102
1103     After a job has been modified, this function needs to be called in
1104     order to write the changes to disk and replicate them to the other
1105     nodes.
1106
1107     @type job: L{_QueuedJob}
1108     @param job: the changed job
1109
1110     """
1111     filename = self._GetJobPath(job.id)
1112     data = serializer.DumpJson(job.Serialize(), indent=False)
1113     logging.debug("Writing job %s to %s", job.id, filename)
1114     self._WriteAndReplicateFileUnlocked(filename, data)
1115
1116     # Notify waiters about potential changes
1117     job.change.notifyAll()
1118
1119   @utils.LockedMethod
1120   @_RequireOpenQueue
1121   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1122                         timeout):
1123     """Waits for changes in a job.
1124
1125     @type job_id: string
1126     @param job_id: Job identifier
1127     @type fields: list of strings
1128     @param fields: Which fields to check for changes
1129     @type prev_job_info: list or None
1130     @param prev_job_info: Last job information returned
1131     @type prev_log_serial: int
1132     @param prev_log_serial: Last job message serial number
1133     @type timeout: float
1134     @param timeout: maximum time to wait
1135     @rtype: tuple (job info, log entries)
1136     @return: a tuple of the job information as required via
1137         the fields parameter, and the log entries as a list
1138
1139         if the job has not changed and the timeout has expired,
1140         we instead return a special value,
1141         L{constants.JOB_NOTCHANGED}, which should be interpreted
1142         as such by the clients
1143
1144     """
1145     job = self._LoadJobUnlocked(job_id)
1146     if not job:
1147       logging.debug("Job %s not found", job_id)
1148       return None
1149
1150     def _CheckForChanges():
1151       logging.debug("Waiting for changes in job %s", job_id)
1152
1153       status = job.CalcStatus()
1154       job_info = self._GetJobInfoUnlocked(job, fields)
1155       log_entries = job.GetLogEntries(prev_log_serial)
1156
1157       # Serializing and deserializing data can cause type changes (e.g. from
1158       # tuple to list) or precision loss. We're doing it here so that we get
1159       # the same modifications as the data received from the client. Without
1160       # this, the comparison afterwards might fail without the data being
1161       # significantly different.
1162       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1163       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1164
1165       # Don't even try to wait if the job is no longer running, there will be
1166       # no changes.
1167       if (status not in (constants.JOB_STATUS_QUEUED,
1168                          constants.JOB_STATUS_RUNNING,
1169                          constants.JOB_STATUS_WAITLOCK) or
1170           prev_job_info != job_info or
1171           (log_entries and prev_log_serial != log_entries[0][0])):
1172         logging.debug("Job %s changed", job_id)
1173         return (job_info, log_entries)
1174
1175       raise utils.RetryAgain()
1176
1177     try:
1178       # Setting wait function to release the queue lock while waiting
1179       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1180                          wait_fn=job.change.wait)
1181     except utils.RetryTimeout:
1182       return constants.JOB_NOTCHANGED
1183
1184   @utils.LockedMethod
1185   @_RequireOpenQueue
1186   def CancelJob(self, job_id):
1187     """Cancels a job.
1188
1189     This will only succeed if the job has not started yet.
1190
1191     @type job_id: string
1192     @param job_id: job ID of job to be cancelled.
1193
1194     """
1195     logging.info("Cancelling job %s", job_id)
1196
1197     job = self._LoadJobUnlocked(job_id)
1198     if not job:
1199       logging.debug("Job %s not found", job_id)
1200       return (False, "Job %s not found" % job_id)
1201
1202     job_status = job.CalcStatus()
1203
1204     if job_status not in (constants.JOB_STATUS_QUEUED,
1205                           constants.JOB_STATUS_WAITLOCK):
1206       logging.debug("Job %s is no longer waiting in the queue", job.id)
1207       return (False, "Job %s is no longer waiting in the queue" % job.id)
1208
1209     if job_status == constants.JOB_STATUS_QUEUED:
1210       self.CancelJobUnlocked(job)
1211       return (True, "Job %s canceled" % job.id)
1212
1213     elif job_status == constants.JOB_STATUS_WAITLOCK:
1214       # The worker will notice the new status and cancel the job
1215       try:
1216         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1217       finally:
1218         self.UpdateJobUnlocked(job)
1219       return (True, "Job %s will be canceled" % job.id)
1220
1221   @_RequireOpenQueue
1222   def CancelJobUnlocked(self, job):
1223     """Marks a job as canceled.
1224
1225     """
1226     try:
1227       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1228                             "Job canceled by request")
1229     finally:
1230       self.UpdateJobUnlocked(job)
1231
1232   @_RequireOpenQueue
1233   def _ArchiveJobsUnlocked(self, jobs):
1234     """Archives jobs.
1235
1236     @type jobs: list of L{_QueuedJob}
1237     @param jobs: Job objects
1238     @rtype: int
1239     @return: Number of archived jobs
1240
1241     """
1242     archive_jobs = []
1243     rename_files = []
1244     for job in jobs:
1245       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1246                                   constants.JOB_STATUS_SUCCESS,
1247                                   constants.JOB_STATUS_ERROR):
1248         logging.debug("Job %s is not yet done", job.id)
1249         continue
1250
1251       archive_jobs.append(job)
1252
1253       old = self._GetJobPath(job.id)
1254       new = self._GetArchivedJobPath(job.id)
1255       rename_files.append((old, new))
1256
1257     # TODO: What if 1..n files fail to rename?
1258     self._RenameFilesUnlocked(rename_files)
1259
1260     logging.debug("Successfully archived job(s) %s",
1261                   utils.CommaJoin(job.id for job in archive_jobs))
1262
1263     # Since we haven't quite checked, above, if we succeeded or failed renaming
1264     # the files, we update the cached queue size from the filesystem. When we
1265     # get around to fix the TODO: above, we can use the number of actually
1266     # archived jobs to fix this.
1267     self._UpdateQueueSizeUnlocked()
1268     return len(archive_jobs)
1269
1270   @utils.LockedMethod
1271   @_RequireOpenQueue
1272   def ArchiveJob(self, job_id):
1273     """Archives a job.
1274
1275     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1276
1277     @type job_id: string
1278     @param job_id: Job ID of job to be archived.
1279     @rtype: bool
1280     @return: Whether job was archived
1281
1282     """
1283     logging.info("Archiving job %s", job_id)
1284
1285     job = self._LoadJobUnlocked(job_id)
1286     if not job:
1287       logging.debug("Job %s not found", job_id)
1288       return False
1289
1290     return self._ArchiveJobsUnlocked([job]) == 1
1291
1292   @utils.LockedMethod
1293   @_RequireOpenQueue
1294   def AutoArchiveJobs(self, age, timeout):
1295     """Archives all jobs based on age.
1296
1297     The method will archive all jobs which are older than the age
1298     parameter. For jobs that don't have an end timestamp, the start
1299     timestamp will be considered. The special '-1' age will cause
1300     archival of all jobs (that are not running or queued).
1301
1302     @type age: int
1303     @param age: the minimum age in seconds
1304
1305     """
1306     logging.info("Archiving jobs with age more than %s seconds", age)
1307
1308     now = time.time()
1309     end_time = now + timeout
1310     archived_count = 0
1311     last_touched = 0
1312
1313     all_job_ids = self._GetJobIDsUnlocked()
1314     pending = []
1315     for idx, job_id in enumerate(all_job_ids):
1316       last_touched = idx + 1
1317
1318       # Not optimal because jobs could be pending
1319       # TODO: Measure average duration for job archival and take number of
1320       # pending jobs into account.
1321       if time.time() > end_time:
1322         break
1323
1324       # Returns None if the job failed to load
1325       job = self._LoadJobUnlocked(job_id)
1326       if job:
1327         if job.end_timestamp is None:
1328           if job.start_timestamp is None:
1329             job_age = job.received_timestamp
1330           else:
1331             job_age = job.start_timestamp
1332         else:
1333           job_age = job.end_timestamp
1334
1335         if age == -1 or now - job_age[0] > age:
1336           pending.append(job)
1337
1338           # Archive 10 jobs at a time
1339           if len(pending) >= 10:
1340             archived_count += self._ArchiveJobsUnlocked(pending)
1341             pending = []
1342
1343     if pending:
1344       archived_count += self._ArchiveJobsUnlocked(pending)
1345
1346     return (archived_count, len(all_job_ids) - last_touched)
1347
1348   @staticmethod
1349   def _GetJobInfoUnlocked(job, fields):
1350     """Returns information about a job.
1351
1352     @type job: L{_QueuedJob}
1353     @param job: the job which we query
1354     @type fields: list
1355     @param fields: names of fields to return
1356     @rtype: list
1357     @return: list with one element for each field
1358     @raise errors.OpExecError: when an invalid field
1359         has been passed
1360
1361     """
1362     row = []
1363     for fname in fields:
1364       if fname == "id":
1365         row.append(job.id)
1366       elif fname == "status":
1367         row.append(job.CalcStatus())
1368       elif fname == "ops":
1369         row.append([op.input.__getstate__() for op in job.ops])
1370       elif fname == "opresult":
1371         row.append([op.result for op in job.ops])
1372       elif fname == "opstatus":
1373         row.append([op.status for op in job.ops])
1374       elif fname == "oplog":
1375         row.append([op.log for op in job.ops])
1376       elif fname == "opstart":
1377         row.append([op.start_timestamp for op in job.ops])
1378       elif fname == "opexec":
1379         row.append([op.exec_timestamp for op in job.ops])
1380       elif fname == "opend":
1381         row.append([op.end_timestamp for op in job.ops])
1382       elif fname == "received_ts":
1383         row.append(job.received_timestamp)
1384       elif fname == "start_ts":
1385         row.append(job.start_timestamp)
1386       elif fname == "end_ts":
1387         row.append(job.end_timestamp)
1388       elif fname == "lock_status":
1389         row.append(job.lock_status)
1390       elif fname == "summary":
1391         row.append([op.input.Summary() for op in job.ops])
1392       else:
1393         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1394     return row
1395
1396   @utils.LockedMethod
1397   @_RequireOpenQueue
1398   def QueryJobs(self, job_ids, fields):
1399     """Returns a list of jobs in queue.
1400
1401     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1402     processing for each job.
1403
1404     @type job_ids: list
1405     @param job_ids: sequence of job identifiers or None for all
1406     @type fields: list
1407     @param fields: names of fields to return
1408     @rtype: list
1409     @return: list one element per job, each element being list with
1410         the requested fields
1411
1412     """
1413     jobs = []
1414
1415     for job in self._GetJobsUnlocked(job_ids):
1416       if job is None:
1417         jobs.append(None)
1418       else:
1419         jobs.append(self._GetJobInfoUnlocked(job, fields))
1420
1421     return jobs
1422
1423   @utils.LockedMethod
1424   @_RequireOpenQueue
1425   def Shutdown(self):
1426     """Stops the job queue.
1427
1428     This shutdowns all the worker threads an closes the queue.
1429
1430     """
1431     self._wpool.TerminateWorkers()
1432
1433     self._queue_filelock.Close()
1434     self._queue_filelock = None