Convert to static methods (where appropriate)
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type log_serial: int
149   @ivar log_serial: holds the index for the next log entry
150   @ivar received_timestamp: the timestamp for when the job was received
151   @ivar start_timestmap: the timestamp for start of execution
152   @ivar end_timestamp: the timestamp for end of execution
153   @ivar lock_status: In-memory locking information for debugging
154   @ivar change: a Condition variable we use for waiting for job changes
155
156   """
157   # pylint: disable-msg=W0212
158   __slots__ = ["queue", "id", "ops", "log_serial",
159                "received_timestamp", "start_timestamp", "end_timestamp",
160                "lock_status", "change",
161                "__weakref__"]
162
163   def __init__(self, queue, job_id, ops):
164     """Constructor for the _QueuedJob.
165
166     @type queue: L{JobQueue}
167     @param queue: our parent queue
168     @type job_id: job_id
169     @param job_id: our job id
170     @type ops: list
171     @param ops: the list of opcodes we hold, which will be encapsulated
172         in _QueuedOpCodes
173
174     """
175     if not ops:
176       # TODO: use a better exception
177       raise Exception("No opcodes")
178
179     self.queue = queue
180     self.id = job_id
181     self.ops = [_QueuedOpCode(op) for op in ops]
182     self.log_serial = 0
183     self.received_timestamp = TimeStampNow()
184     self.start_timestamp = None
185     self.end_timestamp = None
186
187     # In-memory attributes
188     self.lock_status = None
189
190     # Condition to wait for changes
191     self.change = threading.Condition(self.queue._lock)
192
193   @classmethod
194   def Restore(cls, queue, state):
195     """Restore a _QueuedJob from serialized state:
196
197     @type queue: L{JobQueue}
198     @param queue: to which queue the restored job belongs
199     @type state: dict
200     @param state: the serialized state
201     @rtype: _JobQueue
202     @return: the restored _JobQueue instance
203
204     """
205     obj = _QueuedJob.__new__(cls)
206     obj.queue = queue
207     obj.id = state["id"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     # In-memory attributes
213     obj.lock_status = None
214
215     obj.ops = []
216     obj.log_serial = 0
217     for op_state in state["ops"]:
218       op = _QueuedOpCode.Restore(op_state)
219       for log_entry in op.log:
220         obj.log_serial = max(obj.log_serial, log_entry[0])
221       obj.ops.append(op)
222
223     # Condition to wait for changes
224     obj.change = threading.Condition(obj.queue._lock)
225
226     return obj
227
228   def Serialize(self):
229     """Serialize the _JobQueue instance.
230
231     @rtype: dict
232     @return: the serialized state
233
234     """
235     return {
236       "id": self.id,
237       "ops": [op.Serialize() for op in self.ops],
238       "start_timestamp": self.start_timestamp,
239       "end_timestamp": self.end_timestamp,
240       "received_timestamp": self.received_timestamp,
241       }
242
243   def CalcStatus(self):
244     """Compute the status of this job.
245
246     This function iterates over all the _QueuedOpCodes in the job and
247     based on their status, computes the job status.
248
249     The algorithm is:
250       - if we find a cancelled, or finished with error, the job
251         status will be the same
252       - otherwise, the last opcode with the status one of:
253           - waitlock
254           - canceling
255           - running
256
257         will determine the job status
258
259       - otherwise, it means either all opcodes are queued, or success,
260         and the job status will be the same
261
262     @return: the job status
263
264     """
265     status = constants.JOB_STATUS_QUEUED
266
267     all_success = True
268     for op in self.ops:
269       if op.status == constants.OP_STATUS_SUCCESS:
270         continue
271
272       all_success = False
273
274       if op.status == constants.OP_STATUS_QUEUED:
275         pass
276       elif op.status == constants.OP_STATUS_WAITLOCK:
277         status = constants.JOB_STATUS_WAITLOCK
278       elif op.status == constants.OP_STATUS_RUNNING:
279         status = constants.JOB_STATUS_RUNNING
280       elif op.status == constants.OP_STATUS_CANCELING:
281         status = constants.JOB_STATUS_CANCELING
282         break
283       elif op.status == constants.OP_STATUS_ERROR:
284         status = constants.JOB_STATUS_ERROR
285         # The whole job fails if one opcode failed
286         break
287       elif op.status == constants.OP_STATUS_CANCELED:
288         status = constants.OP_STATUS_CANCELED
289         break
290
291     if all_success:
292       status = constants.JOB_STATUS_SUCCESS
293
294     return status
295
296   def GetLogEntries(self, newer_than):
297     """Selectively returns the log entries.
298
299     @type newer_than: None or int
300     @param newer_than: if this is None, return all log entries,
301         otherwise return only the log entries with serial higher
302         than this value
303     @rtype: list
304     @return: the list of the log entries selected
305
306     """
307     if newer_than is None:
308       serial = -1
309     else:
310       serial = newer_than
311
312     entries = []
313     for op in self.ops:
314       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
315
316     return entries
317
318   def MarkUnfinishedOps(self, status, result):
319     """Mark unfinished opcodes with a given status and result.
320
321     This is an utility function for marking all running or waiting to
322     be run opcodes with a given status. Opcodes which are already
323     finalised are not changed.
324
325     @param status: a given opcode status
326     @param result: the opcode result
327
328     """
329     not_marked = True
330     for op in self.ops:
331       if op.status in constants.OPS_FINALIZED:
332         assert not_marked, "Finalized opcodes found after non-finalized ones"
333         continue
334       op.status = status
335       op.result = result
336       not_marked = False
337
338
339 class _OpExecCallbacks(mcpu.OpExecCbBase):
340   def __init__(self, queue, job, op):
341     """Initializes this class.
342
343     @type queue: L{JobQueue}
344     @param queue: Job queue
345     @type job: L{_QueuedJob}
346     @param job: Job object
347     @type op: L{_QueuedOpCode}
348     @param op: OpCode
349
350     """
351     assert queue, "Queue is missing"
352     assert job, "Job is missing"
353     assert op, "Opcode is missing"
354
355     self._queue = queue
356     self._job = job
357     self._op = op
358
359   def NotifyStart(self):
360     """Mark the opcode as running, not lock-waiting.
361
362     This is called from the mcpu code as a notifier function, when the LU is
363     finally about to start the Exec() method. Of course, to have end-user
364     visible results, the opcode must be initially (before calling into
365     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
366
367     """
368     self._queue.acquire()
369     try:
370       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
371                                  constants.OP_STATUS_CANCELING)
372
373       # All locks are acquired by now
374       self._job.lock_status = None
375
376       # Cancel here if we were asked to
377       if self._op.status == constants.OP_STATUS_CANCELING:
378         raise CancelJob()
379
380       self._op.status = constants.OP_STATUS_RUNNING
381     finally:
382       self._queue.release()
383
384   def Feedback(self, *args):
385     """Append a log entry.
386
387     """
388     assert len(args) < 3
389
390     if len(args) == 1:
391       log_type = constants.ELOG_MESSAGE
392       log_msg = args[0]
393     else:
394       (log_type, log_msg) = args
395
396     # The time is split to make serialization easier and not lose
397     # precision.
398     timestamp = utils.SplitTime(time.time())
399
400     self._queue.acquire()
401     try:
402       self._job.log_serial += 1
403       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
404
405       self._job.change.notifyAll()
406     finally:
407       self._queue.release()
408
409   def ReportLocks(self, msg):
410     """Write locking information to the job.
411
412     Called whenever the LU processor is waiting for a lock or has acquired one.
413
414     """
415     # Not getting the queue lock because this is a single assignment
416     self._job.lock_status = msg
417
418
419 class _JobQueueWorker(workerpool.BaseWorker):
420   """The actual job workers.
421
422   """
423   def RunTask(self, job): # pylint: disable-msg=W0221
424     """Job executor.
425
426     This functions processes a job. It is closely tied to the _QueuedJob and
427     _QueuedOpCode classes.
428
429     @type job: L{_QueuedJob}
430     @param job: the job to be processed
431
432     """
433     logging.info("Worker %s processing job %s",
434                   self.worker_id, job.id)
435     proc = mcpu.Processor(self.pool.queue.context, job.id)
436     queue = job.queue
437     try:
438       try:
439         count = len(job.ops)
440         for idx, op in enumerate(job.ops):
441           op_summary = op.input.Summary()
442           if op.status == constants.OP_STATUS_SUCCESS:
443             # this is a job that was partially completed before master
444             # daemon shutdown, so it can be expected that some opcodes
445             # are already completed successfully (if any did error
446             # out, then the whole job should have been aborted and not
447             # resubmitted for processing)
448             logging.info("Op %s/%s: opcode %s already processed, skipping",
449                          idx + 1, count, op_summary)
450             continue
451           try:
452             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
453                          op_summary)
454
455             queue.acquire()
456             try:
457               if op.status == constants.OP_STATUS_CANCELED:
458                 raise CancelJob()
459               assert op.status == constants.OP_STATUS_QUEUED
460               op.status = constants.OP_STATUS_WAITLOCK
461               op.result = None
462               op.start_timestamp = TimeStampNow()
463               if idx == 0: # first opcode
464                 job.start_timestamp = op.start_timestamp
465               queue.UpdateJobUnlocked(job)
466
467               input_opcode = op.input
468             finally:
469               queue.release()
470
471             # Make sure not to hold queue lock while calling ExecOpCode
472             result = proc.ExecOpCode(input_opcode,
473                                      _OpExecCallbacks(queue, job, op))
474
475             queue.acquire()
476             try:
477               op.status = constants.OP_STATUS_SUCCESS
478               op.result = result
479               op.end_timestamp = TimeStampNow()
480               queue.UpdateJobUnlocked(job)
481             finally:
482               queue.release()
483
484             logging.info("Op %s/%s: Successfully finished opcode %s",
485                          idx + 1, count, op_summary)
486           except CancelJob:
487             # Will be handled further up
488             raise
489           except Exception, err:
490             queue.acquire()
491             try:
492               try:
493                 op.status = constants.OP_STATUS_ERROR
494                 if isinstance(err, errors.GenericError):
495                   op.result = errors.EncodeException(err)
496                 else:
497                   op.result = str(err)
498                 op.end_timestamp = TimeStampNow()
499                 logging.info("Op %s/%s: Error in opcode %s: %s",
500                              idx + 1, count, op_summary, err)
501               finally:
502                 queue.UpdateJobUnlocked(job)
503             finally:
504               queue.release()
505             raise
506
507       except CancelJob:
508         queue.acquire()
509         try:
510           queue.CancelJobUnlocked(job)
511         finally:
512           queue.release()
513       except errors.GenericError, err:
514         logging.exception("Ganeti exception")
515       except:
516         logging.exception("Unhandled exception")
517     finally:
518       queue.acquire()
519       try:
520         try:
521           job.lock_status = None
522           job.end_timestamp = TimeStampNow()
523           queue.UpdateJobUnlocked(job)
524         finally:
525           job_id = job.id
526           status = job.CalcStatus()
527       finally:
528         queue.release()
529
530       logging.info("Worker %s finished job %s, status = %s",
531                    self.worker_id, job_id, status)
532
533
534 class _JobQueueWorkerPool(workerpool.WorkerPool):
535   """Simple class implementing a job-processing workerpool.
536
537   """
538   def __init__(self, queue):
539     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
540                                               _JobQueueWorker)
541     self.queue = queue
542
543
544 def _RequireOpenQueue(fn):
545   """Decorator for "public" functions.
546
547   This function should be used for all 'public' functions. That is,
548   functions usually called from other classes. Note that this should
549   be applied only to methods (not plain functions), since it expects
550   that the decorated function is called with a first argument that has
551   a '_queue_lock' argument.
552
553   @warning: Use this decorator only after utils.LockedMethod!
554
555   Example::
556     @utils.LockedMethod
557     @_RequireOpenQueue
558     def Example(self):
559       pass
560
561   """
562   def wrapper(self, *args, **kwargs):
563     # pylint: disable-msg=W0212
564     assert self._queue_lock is not None, "Queue should be open"
565     return fn(self, *args, **kwargs)
566   return wrapper
567
568
569 class JobQueue(object):
570   """Queue used to manage the jobs.
571
572   @cvar _RE_JOB_FILE: regex matching the valid job file names
573
574   """
575   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
576
577   def __init__(self, context):
578     """Constructor for JobQueue.
579
580     The constructor will initialize the job queue object and then
581     start loading the current jobs from disk, either for starting them
582     (if they were queue) or for aborting them (if they were already
583     running).
584
585     @type context: GanetiContext
586     @param context: the context object for access to the configuration
587         data and other ganeti objects
588
589     """
590     self.context = context
591     self._memcache = weakref.WeakValueDictionary()
592     self._my_hostname = utils.HostInfo().name
593
594     # Locking
595     self._lock = threading.Lock()
596     self.acquire = self._lock.acquire
597     self.release = self._lock.release
598
599     # Initialize
600     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
601
602     # Read serial file
603     self._last_serial = jstore.ReadSerial()
604     assert self._last_serial is not None, ("Serial file was modified between"
605                                            " check in jstore and here")
606
607     # Get initial list of nodes
608     self._nodes = dict((n.name, n.primary_ip)
609                        for n in self.context.cfg.GetAllNodesInfo().values()
610                        if n.master_candidate)
611
612     # Remove master node
613     try:
614       del self._nodes[self._my_hostname]
615     except KeyError:
616       pass
617
618     # TODO: Check consistency across nodes
619
620     # Setup worker pool
621     self._wpool = _JobQueueWorkerPool(self)
622     try:
623       # We need to lock here because WorkerPool.AddTask() may start a job while
624       # we're still doing our work.
625       self.acquire()
626       try:
627         logging.info("Inspecting job queue")
628
629         all_job_ids = self._GetJobIDsUnlocked()
630         jobs_count = len(all_job_ids)
631         lastinfo = time.time()
632         for idx, job_id in enumerate(all_job_ids):
633           # Give an update every 1000 jobs or 10 seconds
634           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
635               idx == (jobs_count - 1)):
636             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
637                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
638             lastinfo = time.time()
639
640           job = self._LoadJobUnlocked(job_id)
641
642           # a failure in loading the job can cause 'None' to be returned
643           if job is None:
644             continue
645
646           status = job.CalcStatus()
647
648           if status in (constants.JOB_STATUS_QUEUED, ):
649             self._wpool.AddTask(job)
650
651           elif status in (constants.JOB_STATUS_RUNNING,
652                           constants.JOB_STATUS_WAITLOCK,
653                           constants.JOB_STATUS_CANCELING):
654             logging.warning("Unfinished job %s found: %s", job.id, job)
655             try:
656               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
657                                     "Unclean master daemon shutdown")
658             finally:
659               self.UpdateJobUnlocked(job)
660
661         logging.info("Job queue inspection finished")
662       finally:
663         self.release()
664     except:
665       self._wpool.TerminateWorkers()
666       raise
667
668   @utils.LockedMethod
669   @_RequireOpenQueue
670   def AddNode(self, node):
671     """Register a new node with the queue.
672
673     @type node: L{objects.Node}
674     @param node: the node object to be added
675
676     """
677     node_name = node.name
678     assert node_name != self._my_hostname
679
680     # Clean queue directory on added node
681     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
682     msg = result.fail_msg
683     if msg:
684       logging.warning("Cannot cleanup queue directory on node %s: %s",
685                       node_name, msg)
686
687     if not node.master_candidate:
688       # remove if existing, ignoring errors
689       self._nodes.pop(node_name, None)
690       # and skip the replication of the job ids
691       return
692
693     # Upload the whole queue excluding archived jobs
694     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
695
696     # Upload current serial file
697     files.append(constants.JOB_QUEUE_SERIAL_FILE)
698
699     for file_name in files:
700       # Read file content
701       content = utils.ReadFile(file_name)
702
703       result = rpc.RpcRunner.call_jobqueue_update([node_name],
704                                                   [node.primary_ip],
705                                                   file_name, content)
706       msg = result[node_name].fail_msg
707       if msg:
708         logging.error("Failed to upload file %s to node %s: %s",
709                       file_name, node_name, msg)
710
711     self._nodes[node_name] = node.primary_ip
712
713   @utils.LockedMethod
714   @_RequireOpenQueue
715   def RemoveNode(self, node_name):
716     """Callback called when removing nodes from the cluster.
717
718     @type node_name: str
719     @param node_name: the name of the node to remove
720
721     """
722     try:
723       # The queue is removed by the "leave node" RPC call.
724       del self._nodes[node_name]
725     except KeyError:
726       pass
727
728   @staticmethod
729   def _CheckRpcResult(result, nodes, failmsg):
730     """Verifies the status of an RPC call.
731
732     Since we aim to keep consistency should this node (the current
733     master) fail, we will log errors if our rpc fail, and especially
734     log the case when more than half of the nodes fails.
735
736     @param result: the data as returned from the rpc call
737     @type nodes: list
738     @param nodes: the list of nodes we made the call to
739     @type failmsg: str
740     @param failmsg: the identifier to be used for logging
741
742     """
743     failed = []
744     success = []
745
746     for node in nodes:
747       msg = result[node].fail_msg
748       if msg:
749         failed.append(node)
750         logging.error("RPC call %s failed on node %s: %s",
751                       result[node].call, node, msg)
752       else:
753         success.append(node)
754
755     # +1 for the master node
756     if (len(success) + 1) < len(failed):
757       # TODO: Handle failing nodes
758       logging.error("More than half of the nodes failed")
759
760   def _GetNodeIp(self):
761     """Helper for returning the node name/ip list.
762
763     @rtype: (list, list)
764     @return: a tuple of two lists, the first one with the node
765         names and the second one with the node addresses
766
767     """
768     name_list = self._nodes.keys()
769     addr_list = [self._nodes[name] for name in name_list]
770     return name_list, addr_list
771
772   def _WriteAndReplicateFileUnlocked(self, file_name, data):
773     """Writes a file locally and then replicates it to all nodes.
774
775     This function will replace the contents of a file on the local
776     node and then replicate it to all the other nodes we have.
777
778     @type file_name: str
779     @param file_name: the path of the file to be replicated
780     @type data: str
781     @param data: the new contents of the file
782
783     """
784     utils.WriteFile(file_name, data=data)
785
786     names, addrs = self._GetNodeIp()
787     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
788     self._CheckRpcResult(result, self._nodes,
789                          "Updating %s" % file_name)
790
791   def _RenameFilesUnlocked(self, rename):
792     """Renames a file locally and then replicate the change.
793
794     This function will rename a file in the local queue directory
795     and then replicate this rename to all the other nodes we have.
796
797     @type rename: list of (old, new)
798     @param rename: List containing tuples mapping old to new names
799
800     """
801     # Rename them locally
802     for old, new in rename:
803       utils.RenameFile(old, new, mkdir=True)
804
805     # ... and on all nodes
806     names, addrs = self._GetNodeIp()
807     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
808     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
809
810   @staticmethod
811   def _FormatJobID(job_id):
812     """Convert a job ID to string format.
813
814     Currently this just does C{str(job_id)} after performing some
815     checks, but if we want to change the job id format this will
816     abstract this change.
817
818     @type job_id: int or long
819     @param job_id: the numeric job id
820     @rtype: str
821     @return: the formatted job id
822
823     """
824     if not isinstance(job_id, (int, long)):
825       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
826     if job_id < 0:
827       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
828
829     return str(job_id)
830
831   @classmethod
832   def _GetArchiveDirectory(cls, job_id):
833     """Returns the archive directory for a job.
834
835     @type job_id: str
836     @param job_id: Job identifier
837     @rtype: str
838     @return: Directory name
839
840     """
841     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
842
843   def _NewSerialsUnlocked(self, count):
844     """Generates a new job identifier.
845
846     Job identifiers are unique during the lifetime of a cluster.
847
848     @type count: integer
849     @param count: how many serials to return
850     @rtype: str
851     @return: a string representing the job identifier.
852
853     """
854     assert count > 0
855     # New number
856     serial = self._last_serial + count
857
858     # Write to file
859     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
860                                         "%s\n" % serial)
861
862     result = [self._FormatJobID(v)
863               for v in range(self._last_serial, serial + 1)]
864     # Keep it only if we were able to write the file
865     self._last_serial = serial
866
867     return result
868
869   @staticmethod
870   def _GetJobPath(job_id):
871     """Returns the job file for a given job id.
872
873     @type job_id: str
874     @param job_id: the job identifier
875     @rtype: str
876     @return: the path to the job file
877
878     """
879     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
880
881   @classmethod
882   def _GetArchivedJobPath(cls, job_id):
883     """Returns the archived job file for a give job id.
884
885     @type job_id: str
886     @param job_id: the job identifier
887     @rtype: str
888     @return: the path to the archived job file
889
890     """
891     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
892     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
893
894   @classmethod
895   def _ExtractJobID(cls, name):
896     """Extract the job id from a filename.
897
898     @type name: str
899     @param name: the job filename
900     @rtype: job id or None
901     @return: the job id corresponding to the given filename,
902         or None if the filename does not represent a valid
903         job file
904
905     """
906     m = cls._RE_JOB_FILE.match(name)
907     if m:
908       return m.group(1)
909     else:
910       return None
911
912   def _GetJobIDsUnlocked(self, archived=False):
913     """Return all known job IDs.
914
915     If the parameter archived is True, archived jobs IDs will be
916     included. Currently this argument is unused.
917
918     The method only looks at disk because it's a requirement that all
919     jobs are present on disk (so in the _memcache we don't have any
920     extra IDs).
921
922     @rtype: list
923     @return: the list of job IDs
924
925     """
926     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
927     jlist = utils.NiceSort(jlist)
928     return jlist
929
930   def _ListJobFiles(self):
931     """Returns the list of current job files.
932
933     @rtype: list
934     @return: the list of job file names
935
936     """
937     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
938             if self._RE_JOB_FILE.match(name)]
939
940   def _LoadJobUnlocked(self, job_id):
941     """Loads a job from the disk or memory.
942
943     Given a job id, this will return the cached job object if
944     existing, or try to load the job from the disk. If loading from
945     disk, it will also add the job to the cache.
946
947     @param job_id: the job id
948     @rtype: L{_QueuedJob} or None
949     @return: either None or the job object
950
951     """
952     job = self._memcache.get(job_id, None)
953     if job:
954       logging.debug("Found job %s in memcache", job_id)
955       return job
956
957     filepath = self._GetJobPath(job_id)
958     logging.debug("Loading job from %s", filepath)
959     try:
960       raw_data = utils.ReadFile(filepath)
961     except IOError, err:
962       if err.errno in (errno.ENOENT, ):
963         return None
964       raise
965
966     data = serializer.LoadJson(raw_data)
967
968     try:
969       job = _QueuedJob.Restore(self, data)
970     except Exception, err: # pylint: disable-msg=W0703
971       new_path = self._GetArchivedJobPath(job_id)
972       if filepath == new_path:
973         # job already archived (future case)
974         logging.exception("Can't parse job %s", job_id)
975       else:
976         # non-archived case
977         logging.exception("Can't parse job %s, will archive.", job_id)
978         self._RenameFilesUnlocked([(filepath, new_path)])
979       return None
980
981     self._memcache[job_id] = job
982     logging.debug("Added job %s to the cache", job_id)
983     return job
984
985   def _GetJobsUnlocked(self, job_ids):
986     """Return a list of jobs based on their IDs.
987
988     @type job_ids: list
989     @param job_ids: either an empty list (meaning all jobs),
990         or a list of job IDs
991     @rtype: list
992     @return: the list of job objects
993
994     """
995     if not job_ids:
996       job_ids = self._GetJobIDsUnlocked()
997
998     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
999
1000   @staticmethod
1001   def _IsQueueMarkedDrain():
1002     """Check if the queue is marked from drain.
1003
1004     This currently uses the queue drain file, which makes it a
1005     per-node flag. In the future this can be moved to the config file.
1006
1007     @rtype: boolean
1008     @return: True of the job queue is marked for draining
1009
1010     """
1011     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1012
1013   @staticmethod
1014   def SetDrainFlag(drain_flag):
1015     """Sets the drain flag for the queue.
1016
1017     This is similar to the function L{backend.JobQueueSetDrainFlag},
1018     and in the future we might merge them.
1019
1020     @type drain_flag: boolean
1021     @param drain_flag: Whether to set or unset the drain flag
1022
1023     """
1024     if drain_flag:
1025       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1026     else:
1027       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1028     return True
1029
1030   @_RequireOpenQueue
1031   def _SubmitJobUnlocked(self, job_id, ops):
1032     """Create and store a new job.
1033
1034     This enters the job into our job queue and also puts it on the new
1035     queue, in order for it to be picked up by the queue processors.
1036
1037     @type job_id: job ID
1038     @param job_id: the job ID for the new job
1039     @type ops: list
1040     @param ops: The list of OpCodes that will become the new job.
1041     @rtype: job ID
1042     @return: the job ID of the newly created job
1043     @raise errors.JobQueueDrainError: if the job is marked for draining
1044
1045     """
1046     if self._IsQueueMarkedDrain():
1047       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1048
1049     # Check job queue size
1050     size = len(self._ListJobFiles())
1051     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1052       # TODO: Autoarchive jobs. Make sure it's not done on every job
1053       # submission, though.
1054       #size = ...
1055       pass
1056
1057     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1058       raise errors.JobQueueFull()
1059
1060     job = _QueuedJob(self, job_id, ops)
1061
1062     # Write to disk
1063     self.UpdateJobUnlocked(job)
1064
1065     logging.debug("Adding new job %s to the cache", job_id)
1066     self._memcache[job_id] = job
1067
1068     # Add to worker pool
1069     self._wpool.AddTask(job)
1070
1071     return job.id
1072
1073   @utils.LockedMethod
1074   @_RequireOpenQueue
1075   def SubmitJob(self, ops):
1076     """Create and store a new job.
1077
1078     @see: L{_SubmitJobUnlocked}
1079
1080     """
1081     job_id = self._NewSerialsUnlocked(1)[0]
1082     return self._SubmitJobUnlocked(job_id, ops)
1083
1084   @utils.LockedMethod
1085   @_RequireOpenQueue
1086   def SubmitManyJobs(self, jobs):
1087     """Create and store multiple jobs.
1088
1089     @see: L{_SubmitJobUnlocked}
1090
1091     """
1092     results = []
1093     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1094     for job_id, ops in zip(all_job_ids, jobs):
1095       try:
1096         data = self._SubmitJobUnlocked(job_id, ops)
1097         status = True
1098       except errors.GenericError, err:
1099         data = str(err)
1100         status = False
1101       results.append((status, data))
1102
1103     return results
1104
1105   @_RequireOpenQueue
1106   def UpdateJobUnlocked(self, job):
1107     """Update a job's on disk storage.
1108
1109     After a job has been modified, this function needs to be called in
1110     order to write the changes to disk and replicate them to the other
1111     nodes.
1112
1113     @type job: L{_QueuedJob}
1114     @param job: the changed job
1115
1116     """
1117     filename = self._GetJobPath(job.id)
1118     data = serializer.DumpJson(job.Serialize(), indent=False)
1119     logging.debug("Writing job %s to %s", job.id, filename)
1120     self._WriteAndReplicateFileUnlocked(filename, data)
1121
1122     # Notify waiters about potential changes
1123     job.change.notifyAll()
1124
1125   @utils.LockedMethod
1126   @_RequireOpenQueue
1127   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1128                         timeout):
1129     """Waits for changes in a job.
1130
1131     @type job_id: string
1132     @param job_id: Job identifier
1133     @type fields: list of strings
1134     @param fields: Which fields to check for changes
1135     @type prev_job_info: list or None
1136     @param prev_job_info: Last job information returned
1137     @type prev_log_serial: int
1138     @param prev_log_serial: Last job message serial number
1139     @type timeout: float
1140     @param timeout: maximum time to wait
1141     @rtype: tuple (job info, log entries)
1142     @return: a tuple of the job information as required via
1143         the fields parameter, and the log entries as a list
1144
1145         if the job has not changed and the timeout has expired,
1146         we instead return a special value,
1147         L{constants.JOB_NOTCHANGED}, which should be interpreted
1148         as such by the clients
1149
1150     """
1151     job = self._LoadJobUnlocked(job_id)
1152     if not job:
1153       logging.debug("Job %s not found", job_id)
1154       return None
1155
1156     def _CheckForChanges():
1157       logging.debug("Waiting for changes in job %s", job_id)
1158
1159       status = job.CalcStatus()
1160       job_info = self._GetJobInfoUnlocked(job, fields)
1161       log_entries = job.GetLogEntries(prev_log_serial)
1162
1163       # Serializing and deserializing data can cause type changes (e.g. from
1164       # tuple to list) or precision loss. We're doing it here so that we get
1165       # the same modifications as the data received from the client. Without
1166       # this, the comparison afterwards might fail without the data being
1167       # significantly different.
1168       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1169       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1170
1171       # Don't even try to wait if the job is no longer running, there will be
1172       # no changes.
1173       if (status not in (constants.JOB_STATUS_QUEUED,
1174                          constants.JOB_STATUS_RUNNING,
1175                          constants.JOB_STATUS_WAITLOCK) or
1176           prev_job_info != job_info or
1177           (log_entries and prev_log_serial != log_entries[0][0])):
1178         logging.debug("Job %s changed", job_id)
1179         return (job_info, log_entries)
1180
1181       raise utils.RetryAgain()
1182
1183     try:
1184       # Setting wait function to release the queue lock while waiting
1185       return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1186                          wait_fn=job.change.wait)
1187     except utils.RetryTimeout:
1188       return constants.JOB_NOTCHANGED
1189
1190   @utils.LockedMethod
1191   @_RequireOpenQueue
1192   def CancelJob(self, job_id):
1193     """Cancels a job.
1194
1195     This will only succeed if the job has not started yet.
1196
1197     @type job_id: string
1198     @param job_id: job ID of job to be cancelled.
1199
1200     """
1201     logging.info("Cancelling job %s", job_id)
1202
1203     job = self._LoadJobUnlocked(job_id)
1204     if not job:
1205       logging.debug("Job %s not found", job_id)
1206       return (False, "Job %s not found" % job_id)
1207
1208     job_status = job.CalcStatus()
1209
1210     if job_status not in (constants.JOB_STATUS_QUEUED,
1211                           constants.JOB_STATUS_WAITLOCK):
1212       logging.debug("Job %s is no longer waiting in the queue", job.id)
1213       return (False, "Job %s is no longer waiting in the queue" % job.id)
1214
1215     if job_status == constants.JOB_STATUS_QUEUED:
1216       self.CancelJobUnlocked(job)
1217       return (True, "Job %s canceled" % job.id)
1218
1219     elif job_status == constants.JOB_STATUS_WAITLOCK:
1220       # The worker will notice the new status and cancel the job
1221       try:
1222         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1223       finally:
1224         self.UpdateJobUnlocked(job)
1225       return (True, "Job %s will be canceled" % job.id)
1226
1227   @_RequireOpenQueue
1228   def CancelJobUnlocked(self, job):
1229     """Marks a job as canceled.
1230
1231     """
1232     try:
1233       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1234                             "Job canceled by request")
1235     finally:
1236       self.UpdateJobUnlocked(job)
1237
1238   @_RequireOpenQueue
1239   def _ArchiveJobsUnlocked(self, jobs):
1240     """Archives jobs.
1241
1242     @type jobs: list of L{_QueuedJob}
1243     @param jobs: Job objects
1244     @rtype: int
1245     @return: Number of archived jobs
1246
1247     """
1248     archive_jobs = []
1249     rename_files = []
1250     for job in jobs:
1251       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1252                                   constants.JOB_STATUS_SUCCESS,
1253                                   constants.JOB_STATUS_ERROR):
1254         logging.debug("Job %s is not yet done", job.id)
1255         continue
1256
1257       archive_jobs.append(job)
1258
1259       old = self._GetJobPath(job.id)
1260       new = self._GetArchivedJobPath(job.id)
1261       rename_files.append((old, new))
1262
1263     # TODO: What if 1..n files fail to rename?
1264     self._RenameFilesUnlocked(rename_files)
1265
1266     logging.debug("Successfully archived job(s) %s",
1267                   utils.CommaJoin(job.id for job in archive_jobs))
1268
1269     return len(archive_jobs)
1270
1271   @utils.LockedMethod
1272   @_RequireOpenQueue
1273   def ArchiveJob(self, job_id):
1274     """Archives a job.
1275
1276     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1277
1278     @type job_id: string
1279     @param job_id: Job ID of job to be archived.
1280     @rtype: bool
1281     @return: Whether job was archived
1282
1283     """
1284     logging.info("Archiving job %s", job_id)
1285
1286     job = self._LoadJobUnlocked(job_id)
1287     if not job:
1288       logging.debug("Job %s not found", job_id)
1289       return False
1290
1291     return self._ArchiveJobsUnlocked([job]) == 1
1292
1293   @utils.LockedMethod
1294   @_RequireOpenQueue
1295   def AutoArchiveJobs(self, age, timeout):
1296     """Archives all jobs based on age.
1297
1298     The method will archive all jobs which are older than the age
1299     parameter. For jobs that don't have an end timestamp, the start
1300     timestamp will be considered. The special '-1' age will cause
1301     archival of all jobs (that are not running or queued).
1302
1303     @type age: int
1304     @param age: the minimum age in seconds
1305
1306     """
1307     logging.info("Archiving jobs with age more than %s seconds", age)
1308
1309     now = time.time()
1310     end_time = now + timeout
1311     archived_count = 0
1312     last_touched = 0
1313
1314     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1315     pending = []
1316     for idx, job_id in enumerate(all_job_ids):
1317       last_touched = idx
1318
1319       # Not optimal because jobs could be pending
1320       # TODO: Measure average duration for job archival and take number of
1321       # pending jobs into account.
1322       if time.time() > end_time:
1323         break
1324
1325       # Returns None if the job failed to load
1326       job = self._LoadJobUnlocked(job_id)
1327       if job:
1328         if job.end_timestamp is None:
1329           if job.start_timestamp is None:
1330             job_age = job.received_timestamp
1331           else:
1332             job_age = job.start_timestamp
1333         else:
1334           job_age = job.end_timestamp
1335
1336         if age == -1 or now - job_age[0] > age:
1337           pending.append(job)
1338
1339           # Archive 10 jobs at a time
1340           if len(pending) >= 10:
1341             archived_count += self._ArchiveJobsUnlocked(pending)
1342             pending = []
1343
1344     if pending:
1345       archived_count += self._ArchiveJobsUnlocked(pending)
1346
1347     return (archived_count, len(all_job_ids) - last_touched - 1)
1348
1349   @staticmethod
1350   def _GetJobInfoUnlocked(job, fields):
1351     """Returns information about a job.
1352
1353     @type job: L{_QueuedJob}
1354     @param job: the job which we query
1355     @type fields: list
1356     @param fields: names of fields to return
1357     @rtype: list
1358     @return: list with one element for each field
1359     @raise errors.OpExecError: when an invalid field
1360         has been passed
1361
1362     """
1363     row = []
1364     for fname in fields:
1365       if fname == "id":
1366         row.append(job.id)
1367       elif fname == "status":
1368         row.append(job.CalcStatus())
1369       elif fname == "ops":
1370         row.append([op.input.__getstate__() for op in job.ops])
1371       elif fname == "opresult":
1372         row.append([op.result for op in job.ops])
1373       elif fname == "opstatus":
1374         row.append([op.status for op in job.ops])
1375       elif fname == "oplog":
1376         row.append([op.log for op in job.ops])
1377       elif fname == "opstart":
1378         row.append([op.start_timestamp for op in job.ops])
1379       elif fname == "opend":
1380         row.append([op.end_timestamp for op in job.ops])
1381       elif fname == "received_ts":
1382         row.append(job.received_timestamp)
1383       elif fname == "start_ts":
1384         row.append(job.start_timestamp)
1385       elif fname == "end_ts":
1386         row.append(job.end_timestamp)
1387       elif fname == "lock_status":
1388         row.append(job.lock_status)
1389       elif fname == "summary":
1390         row.append([op.input.Summary() for op in job.ops])
1391       else:
1392         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1393     return row
1394
1395   @utils.LockedMethod
1396   @_RequireOpenQueue
1397   def QueryJobs(self, job_ids, fields):
1398     """Returns a list of jobs in queue.
1399
1400     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1401     processing for each job.
1402
1403     @type job_ids: list
1404     @param job_ids: sequence of job identifiers or None for all
1405     @type fields: list
1406     @param fields: names of fields to return
1407     @rtype: list
1408     @return: list one element per job, each element being list with
1409         the requested fields
1410
1411     """
1412     jobs = []
1413
1414     for job in self._GetJobsUnlocked(job_ids):
1415       if job is None:
1416         jobs.append(None)
1417       else:
1418         jobs.append(self._GetJobInfoUnlocked(job, fields))
1419
1420     return jobs
1421
1422   @utils.LockedMethod
1423   @_RequireOpenQueue
1424   def Shutdown(self):
1425     """Stops the job queue.
1426
1427     This shutdowns all the worker threads an closes the queue.
1428
1429     """
1430     self._wpool.TerminateWorkers()
1431
1432     self._queue_lock.Close()
1433     self._queue_lock = None