Fix a misuse of exc_info in logging.info
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import os
25 import logging
26 import threading
27 import errno
28 import re
29 import time
30
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
39
40
41 JOBQUEUE_THREADS = 5
42
43
44 class _QueuedOpCode(object):
45   """Encasulates an opcode object.
46
47   Access is synchronized by the '_lock' attribute.
48
49   The 'log' attribute holds the execution log and consists of tuples
50   of the form (timestamp, level, message).
51
52   """
53   def __new__(cls, *args, **kwargs):
54     obj = object.__new__(cls, *args, **kwargs)
55     # Create a special lock for logging
56     obj._log_lock = threading.Lock()
57     return obj
58
59   def __init__(self, op):
60     self.input = op
61     self.status = constants.OP_STATUS_QUEUED
62     self.result = None
63     self.log = []
64
65   @classmethod
66   def Restore(cls, state):
67     obj = _QueuedOpCode.__new__(cls)
68     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69     obj.status = state["status"]
70     obj.result = state["result"]
71     obj.log = state["log"]
72     return obj
73
74   def Serialize(self):
75     self._log_lock.acquire()
76     try:
77       return {
78         "input": self.input.__getstate__(),
79         "status": self.status,
80         "result": self.result,
81         "log": self.log,
82         }
83     finally:
84       self._log_lock.release()
85
86   def Log(self, *args):
87     """Append a log entry.
88
89     """
90     assert len(args) < 3
91
92     if len(args) == 1:
93       log_type = constants.ELOG_MESSAGE
94       log_msg = args[0]
95     else:
96       log_type, log_msg = args
97
98     self._log_lock.acquire()
99     try:
100       self.log.append((time.time(), log_type, log_msg))
101     finally:
102       self._log_lock.release()
103
104   def RetrieveLog(self, start_at=0):
105     """Retrieve (a part of) the execution log.
106
107     """
108     self._log_lock.acquire()
109     try:
110       return self.log[start_at:]
111     finally:
112       self._log_lock.release()
113
114
115 class _QueuedJob(object):
116   """In-memory job representation.
117
118   This is what we use to track the user-submitted jobs.
119
120   """
121   def __init__(self, queue, job_id, ops):
122     if not ops:
123       # TODO
124       raise Exception("No opcodes")
125
126     self.queue = queue
127     self.id = job_id
128     self.ops = [_QueuedOpCode(op) for op in ops]
129     self.run_op_index = -1
130
131   @classmethod
132   def Restore(cls, queue, state):
133     obj = _QueuedJob.__new__(cls)
134     obj.queue = queue
135     obj.id = state["id"]
136     obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137     obj.run_op_index = state["run_op_index"]
138     return obj
139
140   def Serialize(self):
141     return {
142       "id": self.id,
143       "ops": [op.Serialize() for op in self.ops],
144       "run_op_index": self.run_op_index,
145       }
146
147   def CalcStatus(self):
148     status = constants.JOB_STATUS_QUEUED
149
150     all_success = True
151     for op in self.ops:
152       if op.status == constants.OP_STATUS_SUCCESS:
153         continue
154
155       all_success = False
156
157       if op.status == constants.OP_STATUS_QUEUED:
158         pass
159       elif op.status == constants.OP_STATUS_RUNNING:
160         status = constants.JOB_STATUS_RUNNING
161       elif op.status == constants.OP_STATUS_ERROR:
162         status = constants.JOB_STATUS_ERROR
163         # The whole job fails if one opcode failed
164         break
165       elif op.status == constants.OP_STATUS_CANCELED:
166         status = constants.OP_STATUS_CANCELED
167         break
168
169     if all_success:
170       status = constants.JOB_STATUS_SUCCESS
171
172     return status
173
174
175 class _JobQueueWorker(workerpool.BaseWorker):
176   def RunTask(self, job):
177     """Job executor.
178
179     This functions processes a job.
180
181     """
182     logging.debug("Worker %s processing job %s",
183                   self.worker_id, job.id)
184     proc = mcpu.Processor(self.pool.context)
185     queue = job.queue
186     try:
187       try:
188         count = len(job.ops)
189         for idx, op in enumerate(job.ops):
190           try:
191             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192
193             queue.acquire()
194             try:
195               job.run_op_index = idx
196               op.status = constants.OP_STATUS_RUNNING
197               op.result = None
198               queue.UpdateJobUnlocked(job)
199
200               input_opcode = op.input
201             finally:
202               queue.release()
203
204             result = proc.ExecOpCode(input_opcode, op.Log)
205
206             queue.acquire()
207             try:
208               op.status = constants.OP_STATUS_SUCCESS
209               op.result = result
210               queue.UpdateJobUnlocked(job)
211             finally:
212               queue.release()
213
214             logging.debug("Op %s/%s: Successfully finished %s",
215                           idx + 1, count, op)
216           except Exception, err:
217             queue.acquire()
218             try:
219               try:
220                 op.status = constants.OP_STATUS_ERROR
221                 op.result = str(err)
222                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223               finally:
224                 queue.UpdateJobUnlocked(job)
225             finally:
226               queue.release()
227             raise
228
229       except errors.GenericError, err:
230         logging.exception("Ganeti exception")
231       except:
232         logging.exception("Unhandled exception")
233     finally:
234       queue.acquire()
235       try:
236         job_id = job.id
237         status = job.CalcStatus()
238       finally:
239         queue.release()
240       logging.debug("Worker %s finished job %s, status = %s",
241                     self.worker_id, job_id, status)
242
243
244 class _JobQueueWorkerPool(workerpool.WorkerPool):
245   def __init__(self, context):
246     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
247                                               _JobQueueWorker)
248     self.context = context
249
250
251 class JobQueue(object):
252   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253
254   def __init__(self, context):
255     self._memcache = {}
256     self._my_hostname = utils.HostInfo().name
257
258     # Locking
259     self._lock = threading.Lock()
260     self.acquire = self._lock.acquire
261     self.release = self._lock.release
262
263     # Make sure our directories exists
264     for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
265       try:
266         os.mkdir(path, 0700)
267       except OSError, err:
268         if err.errno not in (errno.EEXIST, ):
269           raise
270
271     # Get queue lock
272     self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
273     try:
274       utils.LockFile(self.lock_fd)
275     except:
276       self.lock_fd.close()
277       raise
278
279     # Read version
280     try:
281       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
282     except IOError, err:
283       if err.errno not in (errno.ENOENT, ):
284         raise
285
286       # Setup a new queue
287       self._InitQueueUnlocked()
288
289       # Try to open again
290       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291
292     try:
293       # Try to read version
294       version = int(version_fd.read(128))
295
296       # Verify version
297       if version != constants.JOB_QUEUE_VERSION:
298         raise errors.JobQueueError("Found version %s, expected %s",
299                                    version, constants.JOB_QUEUE_VERSION)
300     finally:
301       version_fd.close()
302
303     self._last_serial = self._ReadSerial()
304     if self._last_serial is None:
305       raise errors.ConfigurationError("Can't read/parse the job queue serial"
306                                       " file")
307
308     # Setup worker pool
309     self._wpool = _JobQueueWorkerPool(context)
310
311     # We need to lock here because WorkerPool.AddTask() may start a job while
312     # we're still doing our work.
313     self.acquire()
314     try:
315       for job in self._GetJobsUnlocked(None):
316         status = job.CalcStatus()
317
318         if status in (constants.JOB_STATUS_QUEUED, ):
319           self._wpool.AddTask(job)
320
321         elif status in (constants.JOB_STATUS_RUNNING, ):
322           logging.warning("Unfinished job %s found: %s", job.id, job)
323           try:
324             for op in job.ops:
325               op.status = constants.OP_STATUS_ERROR
326               op.result = "Unclean master daemon shutdown"
327           finally:
328             self.UpdateJobUnlocked(job)
329     finally:
330       self.release()
331
332   @staticmethod
333   def _ReadSerial():
334     """Try to read the job serial file.
335
336     @rtype: None or int
337     @return: If the serial can be read, then it is returned. Otherwise None
338              is returned.
339
340     """
341     try:
342       serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
343       try:
344         # Read last serial
345         serial = int(serial_fd.read(1024).strip())
346       finally:
347         serial_fd.close()
348     except (ValueError, EnvironmentError):
349       serial = None
350
351     return serial
352
353   def _InitQueueUnlocked(self):
354     assert self.lock_fd, "Queue should be open"
355
356     utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
357                     data="%s\n" % constants.JOB_QUEUE_VERSION)
358     if self._ReadSerial() is None:
359       utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
360                       data="%s\n" % 0)
361
362   def _FormatJobID(self, job_id):
363     if not isinstance(job_id, (int, long)):
364       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
365     if job_id < 0:
366       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
367
368     return str(job_id)
369
370   def _NewSerialUnlocked(self, nodes):
371     """Generates a new job identifier.
372
373     Job identifiers are unique during the lifetime of a cluster.
374
375     Returns: A string representing the job identifier.
376
377     """
378     assert self.lock_fd, "Queue should be open"
379
380     # New number
381     serial = self._last_serial + 1
382
383     # Write to file
384     utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
385                     data="%s\n" % serial)
386
387     # Keep it only if we were able to write the file
388     self._last_serial = serial
389
390     # Distribute the serial to the other nodes
391     try:
392       nodes.remove(self._my_hostname)
393     except ValueError:
394       pass
395
396     result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
397     for node in nodes:
398       if not result[node]:
399         logging.error("copy of job queue file to node %s failed", node)
400
401     return self._FormatJobID(serial)
402
403   @staticmethod
404   def _GetJobPath(job_id):
405     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
406
407   @staticmethod
408   def _GetArchivedJobPath(job_id):
409     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
410
411   @classmethod
412   def _ExtractJobID(cls, name):
413     m = cls._RE_JOB_FILE.match(name)
414     if m:
415       return m.group(1)
416     else:
417       return None
418
419   def _GetJobIDsUnlocked(self, archived=False):
420     """Return all known job IDs.
421
422     If the parameter archived is True, archived jobs IDs will be
423     included. Currently this argument is unused.
424
425     The method only looks at disk because it's a requirement that all
426     jobs are present on disk (so in the _memcache we don't have any
427     extra IDs).
428
429     """
430     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
431     jlist.sort()
432     return jlist
433
434   def _ListJobFiles(self):
435     assert self.lock_fd, "Queue should be open"
436
437     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
438             if self._RE_JOB_FILE.match(name)]
439
440   def _LoadJobUnlocked(self, job_id):
441     assert self.lock_fd, "Queue should be open"
442
443     if job_id in self._memcache:
444       logging.debug("Found job %s in memcache", job_id)
445       return self._memcache[job_id]
446
447     filepath = self._GetJobPath(job_id)
448     logging.debug("Loading job from %s", filepath)
449     try:
450       fd = open(filepath, "r")
451     except IOError, err:
452       if err.errno in (errno.ENOENT, ):
453         return None
454       raise
455     try:
456       data = serializer.LoadJson(fd.read())
457     finally:
458       fd.close()
459
460     job = _QueuedJob.Restore(self, data)
461     self._memcache[job_id] = job
462     logging.debug("Added job %s to the cache", job_id)
463     return job
464
465   def _GetJobsUnlocked(self, job_ids):
466     if not job_ids:
467       job_ids = self._GetJobIDsUnlocked()
468
469     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
470
471   @utils.LockedMethod
472   def SubmitJob(self, ops, nodes):
473     """Create and store a new job.
474
475     This enters the job into our job queue and also puts it on the new
476     queue, in order for it to be picked up by the queue processors.
477
478     @type ops: list
479     @param ops: The list of OpCodes that will become the new job.
480     @type nodes: list
481     @param nodes: The list of nodes to which the new job serial will be
482                   distributed.
483
484     """
485     assert self.lock_fd, "Queue should be open"
486
487     # Get job identifier
488     job_id = self._NewSerialUnlocked(nodes)
489     job = _QueuedJob(self, job_id, ops)
490
491     # Write to disk
492     self.UpdateJobUnlocked(job)
493
494     logging.debug("Added new job %s to the cache", job_id)
495     self._memcache[job_id] = job
496
497     # Add to worker pool
498     self._wpool.AddTask(job)
499
500     return job.id
501
502   def UpdateJobUnlocked(self, job):
503     assert self.lock_fd, "Queue should be open"
504
505     filename = self._GetJobPath(job.id)
506     logging.debug("Writing job %s to %s", job.id, filename)
507     utils.WriteFile(filename,
508                     data=serializer.DumpJson(job.Serialize(), indent=False))
509     self._CleanCacheUnlocked([job.id])
510
511   def _CleanCacheUnlocked(self, exclude):
512     """Clean the memory cache.
513
514     The exceptions argument contains job IDs that should not be
515     cleaned.
516
517     """
518     assert isinstance(exclude, list)
519
520     for job in self._memcache.values():
521       if job.id in exclude:
522         continue
523       if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
524                                   constants.JOB_STATUS_RUNNING):
525         logging.debug("Cleaning job %s from the cache", job.id)
526         try:
527           del self._memcache[job.id]
528         except KeyError:
529           pass
530
531   @utils.LockedMethod
532   def CancelJob(self, job_id):
533     """Cancels a job.
534
535     @type job_id: string
536     @param job_id: Job ID of job to be cancelled.
537
538     """
539     logging.debug("Cancelling job %s", job_id)
540
541     job = self._LoadJobUnlocked(job_id)
542     if not job:
543       logging.debug("Job %s not found", job_id)
544       return
545
546     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
547       logging.debug("Job %s is no longer in the queue", job.id)
548       return
549
550     try:
551       for op in job.ops:
552         op.status = constants.OP_STATUS_ERROR
553         op.result = "Job cancelled by request"
554     finally:
555       self.UpdateJobUnlocked(job)
556
557   @utils.LockedMethod
558   def ArchiveJob(self, job_id):
559     """Archives a job.
560
561     @type job_id: string
562     @param job_id: Job ID of job to be archived.
563
564     """
565     logging.debug("Archiving job %s", job_id)
566
567     job = self._LoadJobUnlocked(job_id)
568     if not job:
569       logging.debug("Job %s not found", job_id)
570       return
571
572     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
573                                 constants.JOB_STATUS_SUCCESS,
574                                 constants.JOB_STATUS_ERROR):
575       logging.debug("Job %s is not yet done", job.id)
576       return
577
578     try:
579       old = self._GetJobPath(job.id)
580       new = self._GetArchivedJobPath(job.id)
581
582       os.rename(old, new)
583
584       logging.debug("Successfully archived job %s", job.id)
585     finally:
586       # Cleaning the cache because we don't know what os.rename actually did
587       # and to be on the safe side.
588       self._CleanCacheUnlocked([])
589
590   def _GetJobInfoUnlocked(self, job, fields):
591     row = []
592     for fname in fields:
593       if fname == "id":
594         row.append(job.id)
595       elif fname == "status":
596         row.append(job.CalcStatus())
597       elif fname == "ops":
598         row.append([op.input.__getstate__() for op in job.ops])
599       elif fname == "opresult":
600         row.append([op.result for op in job.ops])
601       elif fname == "opstatus":
602         row.append([op.status for op in job.ops])
603       elif fname == "ticker":
604         ji = job.run_op_index
605         if ji < 0:
606           lmsg = None
607         else:
608           lmsg = job.ops[ji].RetrieveLog(-1)
609           # message might be empty here
610           if lmsg:
611             lmsg = lmsg[0]
612           else:
613             lmsg = None
614         row.append(lmsg)
615       else:
616         raise errors.OpExecError("Invalid job query field '%s'" % fname)
617     return row
618
619   @utils.LockedMethod
620   def QueryJobs(self, job_ids, fields):
621     """Returns a list of jobs in queue.
622
623     Args:
624     - job_ids: Sequence of job identifiers or None for all
625     - fields: Names of fields to return
626
627     """
628     jobs = []
629
630     for job in self._GetJobsUnlocked(job_ids):
631       if job is None:
632         jobs.append(None)
633       else:
634         jobs.append(self._GetJobInfoUnlocked(job, fields))
635
636     return jobs
637
638   @utils.LockedMethod
639   def Shutdown(self):
640     """Stops the job queue.
641
642     """
643     assert self.lock_fd, "Queue should be open"
644
645     self._wpool.TerminateWorkers()
646
647     self.lock_fd.close()
648     self.lock_fd = None