You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
56 lines
1.8 KiB
56 lines
1.8 KiB
import os |
|
import concurrent |
|
import queue |
|
import threading |
|
|
|
from sqlalchemy import create_engine |
|
from sqlalchemy.pool import StaticPool |
|
from sqlalchemy.orm import sessionmaker |
|
|
|
from .logging import Logger |
|
|
|
|
|
# https://stackoverflow.com/questions/26971050/sqlalchemy-sqlite-too-many-sql-variables |
|
SQLITE_LIMIT_VARIABLE_NUMBER = 999 |
|
|
|
|
|
def sql(func): |
|
"""wrapper for sql methods""" |
|
def wrapper(self, *args, **kwargs): |
|
assert threading.currentThread() != self.sql_thread |
|
f = concurrent.futures.Future() |
|
self.db_requests.put((f, func, args, kwargs)) |
|
return f.result(timeout=10) |
|
return wrapper |
|
|
|
class SqlDB(Logger): |
|
|
|
def __init__(self, network, path, base): |
|
Logger.__init__(self) |
|
self.base = base |
|
self.network = network |
|
self.path = path |
|
self.db_requests = queue.Queue() |
|
self.sql_thread = threading.Thread(target=self.run_sql) |
|
self.sql_thread.start() |
|
|
|
def run_sql(self): |
|
engine = create_engine('sqlite:///' + self.path, pool_reset_on_return=None, poolclass=StaticPool)#, echo=True) |
|
DBSession = sessionmaker(bind=engine, autoflush=False) |
|
self.DBSession = DBSession() |
|
if not os.path.exists(self.path): |
|
self.base.metadata.create_all(engine) |
|
while self.network.asyncio_loop.is_running(): |
|
try: |
|
future, func, args, kwargs = self.db_requests.get(timeout=0.1) |
|
except queue.Empty: |
|
continue |
|
try: |
|
result = func(self, *args, **kwargs) |
|
except BaseException as e: |
|
future.set_exception(e) |
|
continue |
|
future.set_result(result) |
|
# write |
|
self.DBSession.commit() |
|
self.logger.info("SQL thread terminated")
|
|
|