diff --git a/lib/util.py b/lib/util.py index 9837ee06c..f39fb39a2 100644 --- a/lib/util.py +++ b/lib/util.py @@ -3,6 +3,7 @@ import platform import shutil from datetime import datetime from decimal import Decimal +import traceback import urlparse import urllib import threading @@ -24,6 +25,20 @@ class MyEncoder(json.JSONEncoder): return obj.as_dict() return super(MyEncoder, self).default(obj) +class ThreadJob: + """A job that is run periodically from a thread's main loop. run() is + called from that thread's context. + """ + + def print_error(self, *msg): + print_error("[%s]" % self.__class__.__name__, *msg) + + def print_msg(self, *msg): + print_msg("[%s]" % self.__class__.__name__, *msg) + + def run(self): + """Called periodically from the thread""" + pass class DaemonThread(threading.Thread): """ daemon thread that terminates cleanly """ @@ -33,6 +48,27 @@ class DaemonThread(threading.Thread): self.parent_thread = threading.currentThread() self.running = False self.running_lock = threading.Lock() + self.job_lock = threading.Lock() + self.jobs = [] + + def add_job(self, job): + with self.job_lock: + self.jobs.append(job) + + def run_jobs(self): + # Don't let a throwing job disrupt the thread, future runs of + # itself, or other jobs. This is useful protection against + # malformed or malicious server responses + with self.job_lock: + for job in self.jobs: + try: + job.run() + except: + traceback.print_exc(file=sys.stderr) + + def remove_job(self, job): + with self.job_lock: + self.jobs.remove(job) def start(self): with self.running_lock: @@ -337,7 +373,6 @@ import socket import errno import json import ssl -import traceback import time class SocketPipe: