ed862dda2284ede832d644f5339f0cdbcfbf0000
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type log_serial: int
149   @ivar log_serial: holds the index for the next log entry
150   @ivar received_timestamp: the timestamp for when the job was received
151   @ivar start_timestmap: the timestamp for start of execution
152   @ivar end_timestamp: the timestamp for end of execution
153   @ivar lock_status: In-memory locking information for debugging
154   @ivar change: a Condition variable we use for waiting for job changes
155
156   """
157   __slots__ = ["queue", "id", "ops", "log_serial",
158                "received_timestamp", "start_timestamp", "end_timestamp",
159                "lock_status", "change",
160                "__weakref__"]
161
162   def __init__(self, queue, job_id, ops):
163     """Constructor for the _QueuedJob.
164
165     @type queue: L{JobQueue}
166     @param queue: our parent queue
167     @type job_id: job_id
168     @param job_id: our job id
169     @type ops: list
170     @param ops: the list of opcodes we hold, which will be encapsulated
171         in _QueuedOpCodes
172
173     """
174     if not ops:
175       # TODO: use a better exception
176       raise Exception("No opcodes")
177
178     self.queue = queue
179     self.id = job_id
180     self.ops = [_QueuedOpCode(op) for op in ops]
181     self.log_serial = 0
182     self.received_timestamp = TimeStampNow()
183     self.start_timestamp = None
184     self.end_timestamp = None
185
186     # In-memory attributes
187     self.lock_status = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.received_timestamp = state.get("received_timestamp", None)
208     obj.start_timestamp = state.get("start_timestamp", None)
209     obj.end_timestamp = state.get("end_timestamp", None)
210
211     # In-memory attributes
212     obj.lock_status = None
213
214     obj.ops = []
215     obj.log_serial = 0
216     for op_state in state["ops"]:
217       op = _QueuedOpCode.Restore(op_state)
218       for log_entry in op.log:
219         obj.log_serial = max(obj.log_serial, log_entry[0])
220       obj.ops.append(op)
221
222     # Condition to wait for changes
223     obj.change = threading.Condition(obj.queue._lock)
224
225     return obj
226
227   def Serialize(self):
228     """Serialize the _JobQueue instance.
229
230     @rtype: dict
231     @return: the serialized state
232
233     """
234     return {
235       "id": self.id,
236       "ops": [op.Serialize() for op in self.ops],
237       "start_timestamp": self.start_timestamp,
238       "end_timestamp": self.end_timestamp,
239       "received_timestamp": self.received_timestamp,
240       }
241
242   def CalcStatus(self):
243     """Compute the status of this job.
244
245     This function iterates over all the _QueuedOpCodes in the job and
246     based on their status, computes the job status.
247
248     The algorithm is:
249       - if we find a cancelled, or finished with error, the job
250         status will be the same
251       - otherwise, the last opcode with the status one of:
252           - waitlock
253           - canceling
254           - running
255
256         will determine the job status
257
258       - otherwise, it means either all opcodes are queued, or success,
259         and the job status will be the same
260
261     @return: the job status
262
263     """
264     status = constants.JOB_STATUS_QUEUED
265
266     all_success = True
267     for op in self.ops:
268       if op.status == constants.OP_STATUS_SUCCESS:
269         continue
270
271       all_success = False
272
273       if op.status == constants.OP_STATUS_QUEUED:
274         pass
275       elif op.status == constants.OP_STATUS_WAITLOCK:
276         status = constants.JOB_STATUS_WAITLOCK
277       elif op.status == constants.OP_STATUS_RUNNING:
278         status = constants.JOB_STATUS_RUNNING
279       elif op.status == constants.OP_STATUS_CANCELING:
280         status = constants.JOB_STATUS_CANCELING
281         break
282       elif op.status == constants.OP_STATUS_ERROR:
283         status = constants.JOB_STATUS_ERROR
284         # The whole job fails if one opcode failed
285         break
286       elif op.status == constants.OP_STATUS_CANCELED:
287         status = constants.OP_STATUS_CANCELED
288         break
289
290     if all_success:
291       status = constants.JOB_STATUS_SUCCESS
292
293     return status
294
295   def GetLogEntries(self, newer_than):
296     """Selectively returns the log entries.
297
298     @type newer_than: None or int
299     @param newer_than: if this is None, return all log entries,
300         otherwise return only the log entries with serial higher
301         than this value
302     @rtype: list
303     @return: the list of the log entries selected
304
305     """
306     if newer_than is None:
307       serial = -1
308     else:
309       serial = newer_than
310
311     entries = []
312     for op in self.ops:
313       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
314
315     return entries
316
317   def MarkUnfinishedOps(self, status, result):
318     """Mark unfinished opcodes with a given status and result.
319
320     This is an utility function for marking all running or waiting to
321     be run opcodes with a given status. Opcodes which are already
322     finalised are not changed.
323
324     @param status: a given opcode status
325     @param result: the opcode result
326
327     """
328     not_marked = True
329     for op in self.ops:
330       if op.status in constants.OPS_FINALIZED:
331         assert not_marked, "Finalized opcodes found after non-finalized ones"
332         continue
333       op.status = status
334       op.result = result
335       not_marked = False
336
337
338 class _OpExecCallbacks(mcpu.OpExecCbBase):
339   def __init__(self, queue, job, op):
340     """Initializes this class.
341
342     @type queue: L{JobQueue}
343     @param queue: Job queue
344     @type job: L{_QueuedJob}
345     @param job: Job object
346     @type op: L{_QueuedOpCode}
347     @param op: OpCode
348
349     """
350     assert queue, "Queue is missing"
351     assert job, "Job is missing"
352     assert op, "Opcode is missing"
353
354     self._queue = queue
355     self._job = job
356     self._op = op
357
358   def NotifyStart(self):
359     """Mark the opcode as running, not lock-waiting.
360
361     This is called from the mcpu code as a notifier function, when the LU is
362     finally about to start the Exec() method. Of course, to have end-user
363     visible results, the opcode must be initially (before calling into
364     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
365
366     """
367     self._queue.acquire()
368     try:
369       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370                                  constants.OP_STATUS_CANCELING)
371
372       # All locks are acquired by now
373       self._job.lock_status = None
374
375       # Cancel here if we were asked to
376       if self._op.status == constants.OP_STATUS_CANCELING:
377         raise CancelJob()
378
379       self._op.status = constants.OP_STATUS_RUNNING
380     finally:
381       self._queue.release()
382
383   def Feedback(self, *args):
384     """Append a log entry.
385
386     """
387     assert len(args) < 3
388
389     if len(args) == 1:
390       log_type = constants.ELOG_MESSAGE
391       log_msg = args[0]
392     else:
393       (log_type, log_msg) = args
394
395     # The time is split to make serialization easier and not lose
396     # precision.
397     timestamp = utils.SplitTime(time.time())
398
399     self._queue.acquire()
400     try:
401       self._job.log_serial += 1
402       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
403
404       self._job.change.notifyAll()
405     finally:
406       self._queue.release()
407
408   def ReportLocks(self, msg):
409     """Write locking information to the job.
410
411     Called whenever the LU processor is waiting for a lock or has acquired one.
412
413     """
414     # Not getting the queue lock because this is a single assignment
415     self._job.lock_status = msg
416
417
418 class _JobQueueWorker(workerpool.BaseWorker):
419   """The actual job workers.
420
421   """
422   def RunTask(self, job):
423     """Job executor.
424
425     This functions processes a job. It is closely tied to the _QueuedJob and
426     _QueuedOpCode classes.
427
428     @type job: L{_QueuedJob}
429     @param job: the job to be processed
430
431     """
432     logging.info("Worker %s processing job %s",
433                   self.worker_id, job.id)
434     proc = mcpu.Processor(self.pool.queue.context, job.id)
435     queue = job.queue
436     try:
437       try:
438         count = len(job.ops)
439         for idx, op in enumerate(job.ops):
440           op_summary = op.input.Summary()
441           if op.status == constants.OP_STATUS_SUCCESS:
442             # this is a job that was partially completed before master
443             # daemon shutdown, so it can be expected that some opcodes
444             # are already completed successfully (if any did error
445             # out, then the whole job should have been aborted and not
446             # resubmitted for processing)
447             logging.info("Op %s/%s: opcode %s already processed, skipping",
448                          idx + 1, count, op_summary)
449             continue
450           try:
451             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452                          op_summary)
453
454             queue.acquire()
455             try:
456               if op.status == constants.OP_STATUS_CANCELED:
457                 raise CancelJob()
458               assert op.status == constants.OP_STATUS_QUEUED
459               op.status = constants.OP_STATUS_WAITLOCK
460               op.result = None
461               op.start_timestamp = TimeStampNow()
462               if idx == 0: # first opcode
463                 job.start_timestamp = op.start_timestamp
464               queue.UpdateJobUnlocked(job)
465
466               input_opcode = op.input
467             finally:
468               queue.release()
469
470             # Make sure not to hold queue lock while calling ExecOpCode
471             result = proc.ExecOpCode(input_opcode,
472                                      _OpExecCallbacks(queue, job, op))
473
474             queue.acquire()
475             try:
476               op.status = constants.OP_STATUS_SUCCESS
477               op.result = result
478               op.end_timestamp = TimeStampNow()
479               queue.UpdateJobUnlocked(job)
480             finally:
481               queue.release()
482
483             logging.info("Op %s/%s: Successfully finished opcode %s",
484                          idx + 1, count, op_summary)
485           except CancelJob:
486             # Will be handled further up
487             raise
488           except Exception, err:
489             queue.acquire()
490             try:
491               try:
492                 op.status = constants.OP_STATUS_ERROR
493                 if isinstance(err, errors.GenericError):
494                   op.result = errors.EncodeException(err)
495                 else:
496                   op.result = str(err)
497                 op.end_timestamp = TimeStampNow()
498                 logging.info("Op %s/%s: Error in opcode %s: %s",
499                              idx + 1, count, op_summary, err)
500               finally:
501                 queue.UpdateJobUnlocked(job)
502             finally:
503               queue.release()
504             raise
505
506       except CancelJob:
507         queue.acquire()
508         try:
509           queue.CancelJobUnlocked(job)
510         finally:
511           queue.release()
512       except errors.GenericError, err:
513         logging.exception("Ganeti exception")
514       except:
515         logging.exception("Unhandled exception")
516     finally:
517       queue.acquire()
518       try:
519         try:
520           job.lock_status = None
521           job.end_timestamp = TimeStampNow()
522           queue.UpdateJobUnlocked(job)
523         finally:
524           job_id = job.id
525           status = job.CalcStatus()
526       finally:
527         queue.release()
528
529       logging.info("Worker %s finished job %s, status = %s",
530                    self.worker_id, job_id, status)
531
532
533 class _JobQueueWorkerPool(workerpool.WorkerPool):
534   """Simple class implementing a job-processing workerpool.
535
536   """
537   def __init__(self, queue):
538     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
539                                               _JobQueueWorker)
540     self.queue = queue
541
542
543 def _RequireOpenQueue(fn):
544   """Decorator for "public" functions.
545
546   This function should be used for all 'public' functions. That is,
547   functions usually called from other classes. Note that this should
548   be applied only to methods (not plain functions), since it expects
549   that the decorated function is called with a first argument that has
550   a '_queue_lock' argument.
551
552   @warning: Use this decorator only after utils.LockedMethod!
553
554   Example::
555     @utils.LockedMethod
556     @_RequireOpenQueue
557     def Example(self):
558       pass
559
560   """
561   def wrapper(self, *args, **kwargs):
562     assert self._queue_lock is not None, "Queue should be open"
563     return fn(self, *args, **kwargs)
564   return wrapper
565
566
567 class JobQueue(object):
568   """Queue used to manage the jobs.
569
570   @cvar _RE_JOB_FILE: regex matching the valid job file names
571
572   """
573   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
574
575   def __init__(self, context):
576     """Constructor for JobQueue.
577
578     The constructor will initialize the job queue object and then
579     start loading the current jobs from disk, either for starting them
580     (if they were queue) or for aborting them (if they were already
581     running).
582
583     @type context: GanetiContext
584     @param context: the context object for access to the configuration
585         data and other ganeti objects
586
587     """
588     self.context = context
589     self._memcache = weakref.WeakValueDictionary()
590     self._my_hostname = utils.HostInfo().name
591
592     # Locking
593     self._lock = threading.Lock()
594     self.acquire = self._lock.acquire
595     self.release = self._lock.release
596
597     # Initialize
598     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
599
600     # Read serial file
601     self._last_serial = jstore.ReadSerial()
602     assert self._last_serial is not None, ("Serial file was modified between"
603                                            " check in jstore and here")
604
605     # Get initial list of nodes
606     self._nodes = dict((n.name, n.primary_ip)
607                        for n in self.context.cfg.GetAllNodesInfo().values()
608                        if n.master_candidate)
609
610     # Remove master node
611     try:
612       del self._nodes[self._my_hostname]
613     except KeyError:
614       pass
615
616     # TODO: Check consistency across nodes
617
618     # Setup worker pool
619     self._wpool = _JobQueueWorkerPool(self)
620     try:
621       # We need to lock here because WorkerPool.AddTask() may start a job while
622       # we're still doing our work.
623       self.acquire()
624       try:
625         logging.info("Inspecting job queue")
626
627         all_job_ids = self._GetJobIDsUnlocked()
628         jobs_count = len(all_job_ids)
629         lastinfo = time.time()
630         for idx, job_id in enumerate(all_job_ids):
631           # Give an update every 1000 jobs or 10 seconds
632           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
633               idx == (jobs_count - 1)):
634             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
635                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
636             lastinfo = time.time()
637
638           job = self._LoadJobUnlocked(job_id)
639
640           # a failure in loading the job can cause 'None' to be returned
641           if job is None:
642             continue
643
644           status = job.CalcStatus()
645
646           if status in (constants.JOB_STATUS_QUEUED, ):
647             self._wpool.AddTask(job)
648
649           elif status in (constants.JOB_STATUS_RUNNING,
650                           constants.JOB_STATUS_WAITLOCK,
651                           constants.JOB_STATUS_CANCELING):
652             logging.warning("Unfinished job %s found: %s", job.id, job)
653             try:
654               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
655                                     "Unclean master daemon shutdown")
656             finally:
657               self.UpdateJobUnlocked(job)
658
659         logging.info("Job queue inspection finished")
660       finally:
661         self.release()
662     except:
663       self._wpool.TerminateWorkers()
664       raise
665
666   @utils.LockedMethod
667   @_RequireOpenQueue
668   def AddNode(self, node):
669     """Register a new node with the queue.
670
671     @type node: L{objects.Node}
672     @param node: the node object to be added
673
674     """
675     node_name = node.name
676     assert node_name != self._my_hostname
677
678     # Clean queue directory on added node
679     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
680     msg = result.fail_msg
681     if msg:
682       logging.warning("Cannot cleanup queue directory on node %s: %s",
683                       node_name, msg)
684
685     if not node.master_candidate:
686       # remove if existing, ignoring errors
687       self._nodes.pop(node_name, None)
688       # and skip the replication of the job ids
689       return
690
691     # Upload the whole queue excluding archived jobs
692     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
693
694     # Upload current serial file
695     files.append(constants.JOB_QUEUE_SERIAL_FILE)
696
697     for file_name in files:
698       # Read file content
699       content = utils.ReadFile(file_name)
700
701       result = rpc.RpcRunner.call_jobqueue_update([node_name],
702                                                   [node.primary_ip],
703                                                   file_name, content)
704       msg = result[node_name].fail_msg
705       if msg:
706         logging.error("Failed to upload file %s to node %s: %s",
707                       file_name, node_name, msg)
708
709     self._nodes[node_name] = node.primary_ip
710
711   @utils.LockedMethod
712   @_RequireOpenQueue
713   def RemoveNode(self, node_name):
714     """Callback called when removing nodes from the cluster.
715
716     @type node_name: str
717     @param node_name: the name of the node to remove
718
719     """
720     try:
721       # The queue is removed by the "leave node" RPC call.
722       del self._nodes[node_name]
723     except KeyError:
724       pass
725
726   def _CheckRpcResult(self, result, nodes, failmsg):
727     """Verifies the status of an RPC call.
728
729     Since we aim to keep consistency should this node (the current
730     master) fail, we will log errors if our rpc fail, and especially
731     log the case when more than half of the nodes fails.
732
733     @param result: the data as returned from the rpc call
734     @type nodes: list
735     @param nodes: the list of nodes we made the call to
736     @type failmsg: str
737     @param failmsg: the identifier to be used for logging
738
739     """
740     failed = []
741     success = []
742
743     for node in nodes:
744       msg = result[node].fail_msg
745       if msg:
746         failed.append(node)
747         logging.error("RPC call %s failed on node %s: %s",
748                       result[node].call, node, msg)
749       else:
750         success.append(node)
751
752     # +1 for the master node
753     if (len(success) + 1) < len(failed):
754       # TODO: Handle failing nodes
755       logging.error("More than half of the nodes failed")
756
757   def _GetNodeIp(self):
758     """Helper for returning the node name/ip list.
759
760     @rtype: (list, list)
761     @return: a tuple of two lists, the first one with the node
762         names and the second one with the node addresses
763
764     """
765     name_list = self._nodes.keys()
766     addr_list = [self._nodes[name] for name in name_list]
767     return name_list, addr_list
768
769   def _WriteAndReplicateFileUnlocked(self, file_name, data):
770     """Writes a file locally and then replicates it to all nodes.
771
772     This function will replace the contents of a file on the local
773     node and then replicate it to all the other nodes we have.
774
775     @type file_name: str
776     @param file_name: the path of the file to be replicated
777     @type data: str
778     @param data: the new contents of the file
779
780     """
781     utils.WriteFile(file_name, data=data)
782
783     names, addrs = self._GetNodeIp()
784     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
785     self._CheckRpcResult(result, self._nodes,
786                          "Updating %s" % file_name)
787
788   def _RenameFilesUnlocked(self, rename):
789     """Renames a file locally and then replicate the change.
790
791     This function will rename a file in the local queue directory
792     and then replicate this rename to all the other nodes we have.
793
794     @type rename: list of (old, new)
795     @param rename: List containing tuples mapping old to new names
796
797     """
798     # Rename them locally
799     for old, new in rename:
800       utils.RenameFile(old, new, mkdir=True)
801
802     # ... and on all nodes
803     names, addrs = self._GetNodeIp()
804     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
805     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
806
807   def _FormatJobID(self, job_id):
808     """Convert a job ID to string format.
809
810     Currently this just does C{str(job_id)} after performing some
811     checks, but if we want to change the job id format this will
812     abstract this change.
813
814     @type job_id: int or long
815     @param job_id: the numeric job id
816     @rtype: str
817     @return: the formatted job id
818
819     """
820     if not isinstance(job_id, (int, long)):
821       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
822     if job_id < 0:
823       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
824
825     return str(job_id)
826
827   @classmethod
828   def _GetArchiveDirectory(cls, job_id):
829     """Returns the archive directory for a job.
830
831     @type job_id: str
832     @param job_id: Job identifier
833     @rtype: str
834     @return: Directory name
835
836     """
837     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
838
839   def _NewSerialsUnlocked(self, count):
840     """Generates a new job identifier.
841
842     Job identifiers are unique during the lifetime of a cluster.
843
844     @type count: integer
845     @param count: how many serials to return
846     @rtype: str
847     @return: a string representing the job identifier.
848
849     """
850     assert count > 0
851     # New number
852     serial = self._last_serial + count
853
854     # Write to file
855     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
856                                         "%s\n" % serial)
857
858     result = [self._FormatJobID(v)
859               for v in range(self._last_serial, serial + 1)]
860     # Keep it only if we were able to write the file
861     self._last_serial = serial
862
863     return result
864
865   @staticmethod
866   def _GetJobPath(job_id):
867     """Returns the job file for a given job id.
868
869     @type job_id: str
870     @param job_id: the job identifier
871     @rtype: str
872     @return: the path to the job file
873
874     """
875     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
876
877   @classmethod
878   def _GetArchivedJobPath(cls, job_id):
879     """Returns the archived job file for a give job id.
880
881     @type job_id: str
882     @param job_id: the job identifier
883     @rtype: str
884     @return: the path to the archived job file
885
886     """
887     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
888     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
889
890   @classmethod
891   def _ExtractJobID(cls, name):
892     """Extract the job id from a filename.
893
894     @type name: str
895     @param name: the job filename
896     @rtype: job id or None
897     @return: the job id corresponding to the given filename,
898         or None if the filename does not represent a valid
899         job file
900
901     """
902     m = cls._RE_JOB_FILE.match(name)
903     if m:
904       return m.group(1)
905     else:
906       return None
907
908   def _GetJobIDsUnlocked(self, archived=False):
909     """Return all known job IDs.
910
911     If the parameter archived is True, archived jobs IDs will be
912     included. Currently this argument is unused.
913
914     The method only looks at disk because it's a requirement that all
915     jobs are present on disk (so in the _memcache we don't have any
916     extra IDs).
917
918     @rtype: list
919     @return: the list of job IDs
920
921     """
922     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
923     jlist = utils.NiceSort(jlist)
924     return jlist
925
926   def _ListJobFiles(self):
927     """Returns the list of current job files.
928
929     @rtype: list
930     @return: the list of job file names
931
932     """
933     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
934             if self._RE_JOB_FILE.match(name)]
935
936   def _LoadJobUnlocked(self, job_id):
937     """Loads a job from the disk or memory.
938
939     Given a job id, this will return the cached job object if
940     existing, or try to load the job from the disk. If loading from
941     disk, it will also add the job to the cache.
942
943     @param job_id: the job id
944     @rtype: L{_QueuedJob} or None
945     @return: either None or the job object
946
947     """
948     job = self._memcache.get(job_id, None)
949     if job:
950       logging.debug("Found job %s in memcache", job_id)
951       return job
952
953     filepath = self._GetJobPath(job_id)
954     logging.debug("Loading job from %s", filepath)
955     try:
956       raw_data = utils.ReadFile(filepath)
957     except IOError, err:
958       if err.errno in (errno.ENOENT, ):
959         return None
960       raise
961
962     data = serializer.LoadJson(raw_data)
963
964     try:
965       job = _QueuedJob.Restore(self, data)
966     except Exception, err:
967       new_path = self._GetArchivedJobPath(job_id)
968       if filepath == new_path:
969         # job already archived (future case)
970         logging.exception("Can't parse job %s", job_id)
971       else:
972         # non-archived case
973         logging.exception("Can't parse job %s, will archive.", job_id)
974         self._RenameFilesUnlocked([(filepath, new_path)])
975       return None
976
977     self._memcache[job_id] = job
978     logging.debug("Added job %s to the cache", job_id)
979     return job
980
981   def _GetJobsUnlocked(self, job_ids):
982     """Return a list of jobs based on their IDs.
983
984     @type job_ids: list
985     @param job_ids: either an empty list (meaning all jobs),
986         or a list of job IDs
987     @rtype: list
988     @return: the list of job objects
989
990     """
991     if not job_ids:
992       job_ids = self._GetJobIDsUnlocked()
993
994     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
995
996   @staticmethod
997   def _IsQueueMarkedDrain():
998     """Check if the queue is marked from drain.
999
1000     This currently uses the queue drain file, which makes it a
1001     per-node flag. In the future this can be moved to the config file.
1002
1003     @rtype: boolean
1004     @return: True of the job queue is marked for draining
1005
1006     """
1007     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1008
1009   @staticmethod
1010   def SetDrainFlag(drain_flag):
1011     """Sets the drain flag for the queue.
1012
1013     This is similar to the function L{backend.JobQueueSetDrainFlag},
1014     and in the future we might merge them.
1015
1016     @type drain_flag: boolean
1017     @param drain_flag: Whether to set or unset the drain flag
1018
1019     """
1020     if drain_flag:
1021       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1022     else:
1023       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1024     return True
1025
1026   @_RequireOpenQueue
1027   def _SubmitJobUnlocked(self, job_id, ops):
1028     """Create and store a new job.
1029
1030     This enters the job into our job queue and also puts it on the new
1031     queue, in order for it to be picked up by the queue processors.
1032
1033     @type job_id: job ID
1034     @param job_id: the job ID for the new job
1035     @type ops: list
1036     @param ops: The list of OpCodes that will become the new job.
1037     @rtype: job ID
1038     @return: the job ID of the newly created job
1039     @raise errors.JobQueueDrainError: if the job is marked for draining
1040
1041     """
1042     if self._IsQueueMarkedDrain():
1043       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1044
1045     # Check job queue size
1046     size = len(self._ListJobFiles())
1047     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1048       # TODO: Autoarchive jobs. Make sure it's not done on every job
1049       # submission, though.
1050       #size = ...
1051       pass
1052
1053     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1054       raise errors.JobQueueFull()
1055
1056     job = _QueuedJob(self, job_id, ops)
1057
1058     # Write to disk
1059     self.UpdateJobUnlocked(job)
1060
1061     logging.debug("Adding new job %s to the cache", job_id)
1062     self._memcache[job_id] = job
1063
1064     # Add to worker pool
1065     self._wpool.AddTask(job)
1066
1067     return job.id
1068
1069   @utils.LockedMethod
1070   @_RequireOpenQueue
1071   def SubmitJob(self, ops):
1072     """Create and store a new job.
1073
1074     @see: L{_SubmitJobUnlocked}
1075
1076     """
1077     job_id = self._NewSerialsUnlocked(1)[0]
1078     return self._SubmitJobUnlocked(job_id, ops)
1079
1080   @utils.LockedMethod
1081   @_RequireOpenQueue
1082   def SubmitManyJobs(self, jobs):
1083     """Create and store multiple jobs.
1084
1085     @see: L{_SubmitJobUnlocked}
1086
1087     """
1088     results = []
1089     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1090     for job_id, ops in zip(all_job_ids, jobs):
1091       try:
1092         data = self._SubmitJobUnlocked(job_id, ops)
1093         status = True
1094       except errors.GenericError, err:
1095         data = str(err)
1096         status = False
1097       results.append((status, data))
1098
1099     return results
1100
1101   @_RequireOpenQueue
1102   def UpdateJobUnlocked(self, job):
1103     """Update a job's on disk storage.
1104
1105     After a job has been modified, this function needs to be called in
1106     order to write the changes to disk and replicate them to the other
1107     nodes.
1108
1109     @type job: L{_QueuedJob}
1110     @param job: the changed job
1111
1112     """
1113     filename = self._GetJobPath(job.id)
1114     data = serializer.DumpJson(job.Serialize(), indent=False)
1115     logging.debug("Writing job %s to %s", job.id, filename)
1116     self._WriteAndReplicateFileUnlocked(filename, data)
1117
1118     # Notify waiters about potential changes
1119     job.change.notifyAll()
1120
1121   @utils.LockedMethod
1122   @_RequireOpenQueue
1123   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1124                         timeout):
1125     """Waits for changes in a job.
1126
1127     @type job_id: string
1128     @param job_id: Job identifier
1129     @type fields: list of strings
1130     @param fields: Which fields to check for changes
1131     @type prev_job_info: list or None
1132     @param prev_job_info: Last job information returned
1133     @type prev_log_serial: int
1134     @param prev_log_serial: Last job message serial number
1135     @type timeout: float
1136     @param timeout: maximum time to wait
1137     @rtype: tuple (job info, log entries)
1138     @return: a tuple of the job information as required via
1139         the fields parameter, and the log entries as a list
1140
1141         if the job has not changed and the timeout has expired,
1142         we instead return a special value,
1143         L{constants.JOB_NOTCHANGED}, which should be interpreted
1144         as such by the clients
1145
1146     """
1147     job = self._LoadJobUnlocked(job_id)
1148     if not job:
1149       logging.debug("Job %s not found", job_id)
1150       return None
1151
1152     def _CheckForChanges():
1153       logging.debug("Waiting for changes in job %s", job_id)
1154
1155       status = job.CalcStatus()
1156       job_info = self._GetJobInfoUnlocked(job, fields)
1157       log_entries = job.GetLogEntries(prev_log_serial)
1158
1159       # Serializing and deserializing data can cause type changes (e.g. from
1160       # tuple to list) or precision loss. We're doing it here so that we get
1161       # the same modifications as the data received from the client. Without
1162       # this, the comparison afterwards might fail without the data being
1163       # significantly different.
1164       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1165       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1166
1167       # Don't even try to wait if the job is no longer running, there will be
1168       # no changes.
1169       if (status not in (constants.JOB_STATUS_QUEUED,
1170                          constants.JOB_STATUS_RUNNING,
1171                          constants.JOB_STATUS_WAITLOCK) or
1172           prev_job_info != job_info or
1173           (log_entries and prev_log_serial != log_entries[0][0])):
1174         logging.debug("Job %s changed", job_id)
1175         return (job_info, log_entries)
1176
1177       raise utils.RetryAgain()
1178
1179     try:
1180       # Setting wait function to release the queue lock while waiting
1181       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1182                          wait_fn=job.change.wait)
1183     except utils.RetryTimeout:
1184       return constants.JOB_NOTCHANGED
1185
1186   @utils.LockedMethod
1187   @_RequireOpenQueue
1188   def CancelJob(self, job_id):
1189     """Cancels a job.
1190
1191     This will only succeed if the job has not started yet.
1192
1193     @type job_id: string
1194     @param job_id: job ID of job to be cancelled.
1195
1196     """
1197     logging.info("Cancelling job %s", job_id)
1198
1199     job = self._LoadJobUnlocked(job_id)
1200     if not job:
1201       logging.debug("Job %s not found", job_id)
1202       return (False, "Job %s not found" % job_id)
1203
1204     job_status = job.CalcStatus()
1205
1206     if job_status not in (constants.JOB_STATUS_QUEUED,
1207                           constants.JOB_STATUS_WAITLOCK):
1208       logging.debug("Job %s is no longer waiting in the queue", job.id)
1209       return (False, "Job %s is no longer waiting in the queue" % job.id)
1210
1211     if job_status == constants.JOB_STATUS_QUEUED:
1212       self.CancelJobUnlocked(job)
1213       return (True, "Job %s canceled" % job.id)
1214
1215     elif job_status == constants.JOB_STATUS_WAITLOCK:
1216       # The worker will notice the new status and cancel the job
1217       try:
1218         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1219       finally:
1220         self.UpdateJobUnlocked(job)
1221       return (True, "Job %s will be canceled" % job.id)
1222
1223   @_RequireOpenQueue
1224   def CancelJobUnlocked(self, job):
1225     """Marks a job as canceled.
1226
1227     """
1228     try:
1229       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230                             "Job canceled by request")
1231     finally:
1232       self.UpdateJobUnlocked(job)
1233
1234   @_RequireOpenQueue
1235   def _ArchiveJobsUnlocked(self, jobs):
1236     """Archives jobs.
1237
1238     @type jobs: list of L{_QueuedJob}
1239     @param jobs: Job objects
1240     @rtype: int
1241     @return: Number of archived jobs
1242
1243     """
1244     archive_jobs = []
1245     rename_files = []
1246     for job in jobs:
1247       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1248                                   constants.JOB_STATUS_SUCCESS,
1249                                   constants.JOB_STATUS_ERROR):
1250         logging.debug("Job %s is not yet done", job.id)
1251         continue
1252
1253       archive_jobs.append(job)
1254
1255       old = self._GetJobPath(job.id)
1256       new = self._GetArchivedJobPath(job.id)
1257       rename_files.append((old, new))
1258
1259     # TODO: What if 1..n files fail to rename?
1260     self._RenameFilesUnlocked(rename_files)
1261
1262     logging.debug("Successfully archived job(s) %s",
1263                   ", ".join(job.id for job in archive_jobs))
1264
1265     return len(archive_jobs)
1266
1267   @utils.LockedMethod
1268   @_RequireOpenQueue
1269   def ArchiveJob(self, job_id):
1270     """Archives a job.
1271
1272     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1273
1274     @type job_id: string
1275     @param job_id: Job ID of job to be archived.
1276     @rtype: bool
1277     @return: Whether job was archived
1278
1279     """
1280     logging.info("Archiving job %s", job_id)
1281
1282     job = self._LoadJobUnlocked(job_id)
1283     if not job:
1284       logging.debug("Job %s not found", job_id)
1285       return False
1286
1287     return self._ArchiveJobsUnlocked([job]) == 1
1288
1289   @utils.LockedMethod
1290   @_RequireOpenQueue
1291   def AutoArchiveJobs(self, age, timeout):
1292     """Archives all jobs based on age.
1293
1294     The method will archive all jobs which are older than the age
1295     parameter. For jobs that don't have an end timestamp, the start
1296     timestamp will be considered. The special '-1' age will cause
1297     archival of all jobs (that are not running or queued).
1298
1299     @type age: int
1300     @param age: the minimum age in seconds
1301
1302     """
1303     logging.info("Archiving jobs with age more than %s seconds", age)
1304
1305     now = time.time()
1306     end_time = now + timeout
1307     archived_count = 0
1308     last_touched = 0
1309
1310     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1311     pending = []
1312     for idx, job_id in enumerate(all_job_ids):
1313       last_touched = idx
1314
1315       # Not optimal because jobs could be pending
1316       # TODO: Measure average duration for job archival and take number of
1317       # pending jobs into account.
1318       if time.time() > end_time:
1319         break
1320
1321       # Returns None if the job failed to load
1322       job = self._LoadJobUnlocked(job_id)
1323       if job:
1324         if job.end_timestamp is None:
1325           if job.start_timestamp is None:
1326             job_age = job.received_timestamp
1327           else:
1328             job_age = job.start_timestamp
1329         else:
1330           job_age = job.end_timestamp
1331
1332         if age == -1 or now - job_age[0] > age:
1333           pending.append(job)
1334
1335           # Archive 10 jobs at a time
1336           if len(pending) >= 10:
1337             archived_count += self._ArchiveJobsUnlocked(pending)
1338             pending = []
1339
1340     if pending:
1341       archived_count += self._ArchiveJobsUnlocked(pending)
1342
1343     return (archived_count, len(all_job_ids) - last_touched - 1)
1344
1345   def _GetJobInfoUnlocked(self, job, fields):
1346     """Returns information about a job.
1347
1348     @type job: L{_QueuedJob}
1349     @param job: the job which we query
1350     @type fields: list
1351     @param fields: names of fields to return
1352     @rtype: list
1353     @return: list with one element for each field
1354     @raise errors.OpExecError: when an invalid field
1355         has been passed
1356
1357     """
1358     row = []
1359     for fname in fields:
1360       if fname == "id":
1361         row.append(job.id)
1362       elif fname == "status":
1363         row.append(job.CalcStatus())
1364       elif fname == "ops":
1365         row.append([op.input.__getstate__() for op in job.ops])
1366       elif fname == "opresult":
1367         row.append([op.result for op in job.ops])
1368       elif fname == "opstatus":
1369         row.append([op.status for op in job.ops])
1370       elif fname == "oplog":
1371         row.append([op.log for op in job.ops])
1372       elif fname == "opstart":
1373         row.append([op.start_timestamp for op in job.ops])
1374       elif fname == "opend":
1375         row.append([op.end_timestamp for op in job.ops])
1376       elif fname == "received_ts":
1377         row.append(job.received_timestamp)
1378       elif fname == "start_ts":
1379         row.append(job.start_timestamp)
1380       elif fname == "end_ts":
1381         row.append(job.end_timestamp)
1382       elif fname == "lock_status":
1383         row.append(job.lock_status)
1384       elif fname == "summary":
1385         row.append([op.input.Summary() for op in job.ops])
1386       else:
1387         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1388     return row
1389
1390   @utils.LockedMethod
1391   @_RequireOpenQueue
1392   def QueryJobs(self, job_ids, fields):
1393     """Returns a list of jobs in queue.
1394
1395     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1396     processing for each job.
1397
1398     @type job_ids: list
1399     @param job_ids: sequence of job identifiers or None for all
1400     @type fields: list
1401     @param fields: names of fields to return
1402     @rtype: list
1403     @return: list one element per job, each element being list with
1404         the requested fields
1405
1406     """
1407     jobs = []
1408
1409     for job in self._GetJobsUnlocked(job_ids):
1410       if job is None:
1411         jobs.append(None)
1412       else:
1413         jobs.append(self._GetJobInfoUnlocked(job, fields))
1414
1415     return jobs
1416
1417   @utils.LockedMethod
1418   @_RequireOpenQueue
1419   def Shutdown(self):
1420     """Stops the job queue.
1421
1422     This shutdowns all the worker threads an closes the queue.
1423
1424     """
1425     self._wpool.TerminateWorkers()
1426
1427     self._queue_lock.Close()
1428     self._queue_lock = None