Fix logging with string job IDs
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import os
25 import logging
26 import threading
27 import errno
28 import re
29 import time
30
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
39
40
41 JOBQUEUE_THREADS = 5
42
43
44 class _QueuedOpCode(object):
45   """Encasulates an opcode object.
46
47   Access is synchronized by the '_lock' attribute.
48
49   The 'log' attribute holds the execution log and consists of tuples
50   of the form (timestamp, level, message).
51
52   """
53   def __init__(self, op):
54     self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55
56   def __Setup(self, input_, status, result, log):
57     self._lock = threading.Lock()
58     self.input = input_
59     self.status = status
60     self.result = result
61     self.log = log
62
63   @classmethod
64   def Restore(cls, state):
65     obj = object.__new__(cls)
66     obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67                 state["status"], state["result"], state["log"])
68     return obj
69
70   @utils.LockedMethod
71   def Serialize(self):
72     return {
73       "input": self.input.__getstate__(),
74       "status": self.status,
75       "result": self.result,
76       "log": self.log,
77       }
78
79   @utils.LockedMethod
80   def GetInput(self):
81     """Returns the original opcode.
82
83     """
84     return self.input
85
86   @utils.LockedMethod
87   def SetStatus(self, status, result):
88     """Update the opcode status and result.
89
90     """
91     self.status = status
92     self.result = result
93
94   @utils.LockedMethod
95   def GetStatus(self):
96     """Get the opcode status.
97
98     """
99     return self.status
100
101   @utils.LockedMethod
102   def GetResult(self):
103     """Get the opcode result.
104
105     """
106     return self.result
107
108   @utils.LockedMethod
109   def Log(self, *args):
110     """Append a log entry.
111
112     """
113     assert len(args) < 2
114
115     if len(args) == 1:
116       log_type = constants.ELOG_MESSAGE
117       log_msg = args[0]
118     else:
119       log_type, log_msg = args
120     self.log.append((time.time(), log_type, log_msg))
121
122   @utils.LockedMethod
123   def RetrieveLog(self, start_at=0):
124     """Retrieve (a part of) the execution log.
125
126     """
127     return self.log[start_at:]
128
129
130 class _QueuedJob(object):
131   """In-memory job representation.
132
133   This is what we use to track the user-submitted jobs.
134
135   """
136   def __init__(self, storage, job_id, ops):
137     if not ops:
138       # TODO
139       raise Exception("No opcodes")
140
141     self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142
143   def __Setup(self, storage, job_id, ops, run_op_index):
144     self._lock = threading.Lock()
145     self.storage = storage
146     self.id = job_id
147     self._ops = ops
148     self.run_op_index = run_op_index
149
150   @classmethod
151   def Restore(cls, storage, state):
152     obj = object.__new__(cls)
153     op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154     obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155     return obj
156
157   def Serialize(self):
158     return {
159       "id": self.id,
160       "ops": [op.Serialize() for op in self._ops],
161       "run_op_index": self.run_op_index,
162       }
163
164   def SetUnclean(self, msg):
165     try:
166       for op in self._ops:
167         op.SetStatus(constants.OP_STATUS_ERROR, msg)
168     finally:
169       self.storage.UpdateJob(self)
170
171   def GetStatus(self):
172     status = constants.JOB_STATUS_QUEUED
173
174     all_success = True
175     for op in self._ops:
176       op_status = op.GetStatus()
177       if op_status == constants.OP_STATUS_SUCCESS:
178         continue
179
180       all_success = False
181
182       if op_status == constants.OP_STATUS_QUEUED:
183         pass
184       elif op_status == constants.OP_STATUS_RUNNING:
185         status = constants.JOB_STATUS_RUNNING
186       elif op_status == constants.OP_STATUS_ERROR:
187         status = constants.JOB_STATUS_ERROR
188         # The whole job fails if one opcode failed
189         break
190
191     if all_success:
192       status = constants.JOB_STATUS_SUCCESS
193
194     return status
195
196   @utils.LockedMethod
197   def GetRunOpIndex(self):
198     return self.run_op_index
199
200   def Run(self, proc):
201     """Job executor.
202
203     This functions processes a this job in the context of given processor
204     instance.
205
206     Args:
207     - proc: Ganeti Processor to run the job with
208
209     """
210     try:
211       count = len(self._ops)
212       for idx, op in enumerate(self._ops):
213         try:
214           logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215
216           self._lock.acquire()
217           try:
218             self.run_op_index = idx
219           finally:
220             self._lock.release()
221
222           op.SetStatus(constants.OP_STATUS_RUNNING, None)
223           self.storage.UpdateJob(self)
224
225           result = proc.ExecOpCode(op.input, op.Log)
226
227           op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228           self.storage.UpdateJob(self)
229           logging.debug("Op %s/%s: Successfully finished %s",
230                         idx + 1, count, op)
231         except Exception, err:
232           try:
233             op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234             logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235           finally:
236             self.storage.UpdateJob(self)
237           raise
238
239     except errors.GenericError, err:
240       logging.error("ganeti exception %s", exc_info=err)
241     except Exception, err:
242       logging.error("unhandled exception %s", exc_info=err)
243     except:
244       logging.error("unhandled unknown exception %s", exc_info=err)
245
246
247 class _JobQueueWorker(workerpool.BaseWorker):
248   def RunTask(self, job):
249     logging.debug("Worker %s processing job %s",
250                   self.worker_id, job.id)
251     # TODO: feedback function
252     proc = mcpu.Processor(self.pool.context)
253     try:
254       job.Run(proc)
255     finally:
256       logging.debug("Worker %s finished job %s, status = %s",
257                     self.worker_id, job.id, job.GetStatus())
258
259
260 class _JobQueueWorkerPool(workerpool.WorkerPool):
261   def __init__(self, context):
262     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263                                               _JobQueueWorker)
264     self.context = context
265
266
267 class JobStorage(object):
268   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
269
270   def __init__(self):
271     self._lock = threading.Lock()
272     self._memcache = {}
273     self._my_hostname = utils.HostInfo().name
274
275     # Make sure our directory exists
276     try:
277       os.mkdir(constants.QUEUE_DIR, 0700)
278     except OSError, err:
279       if err.errno not in (errno.EEXIST, ):
280         raise
281
282     # Get queue lock
283     self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
284     try:
285       utils.LockFile(self.lock_fd)
286     except:
287       self.lock_fd.close()
288       raise
289
290     # Read version
291     try:
292       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
293     except IOError, err:
294       if err.errno not in (errno.ENOENT, ):
295         raise
296
297       # Setup a new queue
298       self._InitQueueUnlocked()
299
300       # Try to open again
301       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
302
303     try:
304       # Try to read version
305       version = int(version_fd.read(128))
306
307       # Verify version
308       if version != constants.JOB_QUEUE_VERSION:
309         raise errors.JobQueueError("Found version %s, expected %s",
310                                    version, constants.JOB_QUEUE_VERSION)
311     finally:
312       version_fd.close()
313
314     self._last_serial = self._ReadSerial()
315     if self._last_serial is None:
316       raise errors.ConfigurationError("Can't read/parse the job queue serial"
317                                       " file")
318
319   @staticmethod
320   def _ReadSerial():
321     """Try to read the job serial file.
322
323     @rtype: None or int
324     @return: If the serial can be read, then it is returned. Otherwise None
325              is returned.
326
327     """
328     try:
329       serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
330       try:
331         # Read last serial
332         serial = int(serial_fd.read(1024).strip())
333       finally:
334         serial_fd.close()
335     except (ValueError, EnvironmentError):
336       serial = None
337
338     return serial
339
340   def Close(self):
341     assert self.lock_fd, "Queue should be open"
342
343     self.lock_fd.close()
344     self.lock_fd = None
345
346   def _InitQueueUnlocked(self):
347     assert self.lock_fd, "Queue should be open"
348
349     utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
350                     data="%s\n" % constants.JOB_QUEUE_VERSION)
351     if self._ReadSerial() is None:
352       utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
353                       data="%s\n" % 0)
354
355   def _NewSerialUnlocked(self, nodes):
356     """Generates a new job identifier.
357
358     Job identifiers are unique during the lifetime of a cluster.
359
360     Returns: A string representing the job identifier.
361
362     """
363     assert self.lock_fd, "Queue should be open"
364
365     # New number
366     serial = self._last_serial + 1
367
368     # Write to file
369     utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
370                     data="%s\n" % serial)
371
372     # Keep it only if we were able to write the file
373     self._last_serial = serial
374
375     # Distribute the serial to the other nodes
376     try:
377       nodes.remove(self._my_hostname)
378     except ValueError:
379       pass
380
381     result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
382     for node in nodes:
383       if not result[node]:
384         logging.error("copy of job queue file to node %s failed", node)
385
386     return str(serial)
387
388   def _GetJobPath(self, job_id):
389     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
390
391   def _GetJobIDsUnlocked(self, archived=False):
392     """Return all known job IDs.
393
394     If the parameter archived is True, archived jobs IDs will be
395     included. Currently this argument is unused.
396
397     The method only looks at disk because it's a requirement that all
398     jobs are present on disk (so in the _memcache we don't have any
399     extra IDs).
400
401     """
402     jfiles = self._ListJobFiles()
403     jlist = [int(m.group(1)) for m in
404              [self._RE_JOB_FILE.match(name) for name in jfiles]]
405     jlist.sort()
406     return jlist
407
408   def _ListJobFiles(self):
409     assert self.lock_fd, "Queue should be open"
410
411     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
412             if self._RE_JOB_FILE.match(name)]
413
414   def _LoadJobUnlocked(self, job_id):
415     assert self.lock_fd, "Queue should be open"
416
417     if job_id in self._memcache:
418       logging.debug("Found job %s in memcache", job_id)
419       return self._memcache[job_id]
420
421     filepath = self._GetJobPath(job_id)
422     logging.debug("Loading job from %s", filepath)
423     try:
424       fd = open(filepath, "r")
425     except IOError, err:
426       if err.errno in (errno.ENOENT, ):
427         return None
428       raise
429     try:
430       data = serializer.LoadJson(fd.read())
431     finally:
432       fd.close()
433
434     job = _QueuedJob.Restore(self, data)
435     self._memcache[job_id] = job
436     logging.debug("Added job %s to the cache", job_id)
437     return job
438
439   def _GetJobsUnlocked(self, job_ids):
440     if not job_ids:
441       job_ids = self._GetJobIDsUnlocked()
442
443     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
444
445   @utils.LockedMethod
446   def GetJobs(self, job_ids):
447     return self._GetJobsUnlocked(job_ids)
448
449   @utils.LockedMethod
450   def AddJob(self, ops, nodes):
451     """Create and store on disk a new job.
452
453     @type ops: list
454     @param ops: The list of OpCodes that will become the new job.
455     @type nodes: list
456     @param nodes: The list of nodes to which the new job serial will be
457                   distributed.
458
459     """
460     assert self.lock_fd, "Queue should be open"
461
462     # Get job identifier
463     job_id = self._NewSerialUnlocked(nodes)
464     job = _QueuedJob(self, job_id, ops)
465
466     # Write to disk
467     self._UpdateJobUnlocked(job)
468
469     logging.debug("Added new job %s to the cache", job_id)
470     self._memcache[job_id] = job
471
472     return job
473
474   def _UpdateJobUnlocked(self, job):
475     assert self.lock_fd, "Queue should be open"
476
477     filename = self._GetJobPath(job.id)
478     logging.debug("Writing job %s to %s", job.id, filename)
479     utils.WriteFile(filename,
480                     data=serializer.DumpJson(job.Serialize(), indent=False))
481     self._CleanCacheUnlocked([job.id])
482
483   def _CleanCacheUnlocked(self, exclude):
484     """Clean the memory cache.
485
486     The exceptions argument contains job IDs that should not be
487     cleaned.
488
489     """
490     assert isinstance(exclude, list)
491     for job in self._memcache.values():
492       if job.id in exclude:
493         continue
494       if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
495                                  constants.JOB_STATUS_RUNNING):
496         logging.debug("Cleaning job %s from the cache", job.id)
497         try:
498           del self._memcache[job.id]
499         except KeyError:
500           pass
501
502   @utils.LockedMethod
503   def UpdateJob(self, job):
504     return self._UpdateJobUnlocked(job)
505
506   def ArchiveJob(self, job_id):
507     raise NotImplementedError()
508
509
510 class JobQueue:
511   """The job queue.
512
513    """
514   def __init__(self, context):
515     self._lock = threading.Lock()
516     self._jobs = JobStorage()
517     self._wpool = _JobQueueWorkerPool(context)
518
519     for job in self._jobs.GetJobs(None):
520       status = job.GetStatus()
521       if status in (constants.JOB_STATUS_QUEUED, ):
522         self._wpool.AddTask(job)
523
524       elif status in (constants.JOB_STATUS_RUNNING, ):
525         logging.warning("Unfinished job %s found: %s", job.id, job)
526         job.SetUnclean("Unclean master daemon shutdown")
527
528   @utils.LockedMethod
529   def SubmitJob(self, ops, nodes):
530     """Add a new job to the queue.
531
532     This enters the job into our job queue and also puts it on the new
533     queue, in order for it to be picked up by the queue processors.
534
535     @type ops: list
536     @param ops: the sequence of opcodes that will become the new job
537     @type nodes: list
538     @param nodes: the list of nodes to which the queue should be
539                   distributed
540
541     """
542     job = self._jobs.AddJob(ops, nodes)
543
544     # Add to worker pool
545     self._wpool.AddTask(job)
546
547     return job.id
548
549   def ArchiveJob(self, job_id):
550     raise NotImplementedError()
551
552   def CancelJob(self, job_id):
553     raise NotImplementedError()
554
555   def _GetJobInfo(self, job, fields):
556     row = []
557     for fname in fields:
558       if fname == "id":
559         row.append(job.id)
560       elif fname == "status":
561         row.append(job.GetStatus())
562       elif fname == "ops":
563         row.append([op.GetInput().__getstate__() for op in job._ops])
564       elif fname == "opresult":
565         row.append([op.GetResult() for op in job._ops])
566       elif fname == "opstatus":
567         row.append([op.GetStatus() for op in job._ops])
568       elif fname == "ticker":
569         ji = job.GetRunOpIndex()
570         if ji < 0:
571           lmsg = None
572         else:
573           lmsg = job._ops[ji].RetrieveLog(-1)
574           # message might be empty here
575           if lmsg:
576             lmsg = lmsg[0]
577           else:
578             lmsg = None
579         row.append(lmsg)
580       else:
581         raise errors.OpExecError("Invalid job query field '%s'" % fname)
582     return row
583
584   def QueryJobs(self, job_ids, fields):
585     """Returns a list of jobs in queue.
586
587     Args:
588     - job_ids: Sequence of job identifiers or None for all
589     - fields: Names of fields to return
590
591     """
592     self._lock.acquire()
593     try:
594       jobs = []
595
596       for job in self._jobs.GetJobs(job_ids):
597         if job is None:
598           jobs.append(None)
599         else:
600           jobs.append(self._GetJobInfo(job, fields))
601
602       return jobs
603     finally:
604       self._lock.release()
605
606   @utils.LockedMethod
607   def Shutdown(self):
608     """Stops the job queue.
609
610     """
611     self._wpool.TerminateWorkers()
612     self._jobs.Close()