|
|
|
@ -305,7 +305,7 @@ class ChannelDB(SqlDB): |
|
|
|
self.DBSession.commit() |
|
|
|
self.DBSession.commit() |
|
|
|
|
|
|
|
|
|
|
|
@sql |
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
#@profiler |
|
|
|
def on_channel_announcement(self, msg_payloads, trusted=False): |
|
|
|
def on_channel_announcement(self, msg_payloads, trusted=False): |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
@ -335,12 +335,15 @@ class ChannelDB(SqlDB): |
|
|
|
|
|
|
|
|
|
|
|
@sql |
|
|
|
@sql |
|
|
|
def get_last_timestamp(self): |
|
|
|
def get_last_timestamp(self): |
|
|
|
|
|
|
|
return self._get_last_timestamp() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_last_timestamp(self): |
|
|
|
from sqlalchemy.sql import func |
|
|
|
from sqlalchemy.sql import func |
|
|
|
r = self.DBSession.query(func.max(Policy.timestamp).label('max_timestamp')).one() |
|
|
|
r = self.DBSession.query(func.max(Policy.timestamp).label('max_timestamp')).one() |
|
|
|
return r.max_timestamp or 0 |
|
|
|
return r.max_timestamp or 0 |
|
|
|
|
|
|
|
|
|
|
|
@sql |
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
#@profiler |
|
|
|
def on_channel_update(self, msg_payloads, trusted=False): |
|
|
|
def on_channel_update(self, msg_payloads, trusted=False): |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
@ -372,16 +375,18 @@ class ChannelDB(SqlDB): |
|
|
|
if p and p.timestamp >= new_policy.timestamp: |
|
|
|
if p and p.timestamp >= new_policy.timestamp: |
|
|
|
continue |
|
|
|
continue |
|
|
|
new_policies[(short_channel_id, node_id)] = new_policy |
|
|
|
new_policies[(short_channel_id, node_id)] = new_policy |
|
|
|
self.print_error('on_channel_update: %d/%d'%(len(new_policies), len(msg_payloads))) |
|
|
|
#self.print_error('on_channel_update: %d/%d'%(len(new_policies), len(msg_payloads))) |
|
|
|
# commit pending removals |
|
|
|
# commit pending removals |
|
|
|
self.DBSession.commit() |
|
|
|
self.DBSession.commit() |
|
|
|
# add and commit new policies |
|
|
|
# add and commit new policies |
|
|
|
for new_policy in new_policies.values(): |
|
|
|
for new_policy in new_policies.values(): |
|
|
|
self.DBSession.add(new_policy) |
|
|
|
self.DBSession.add(new_policy) |
|
|
|
self.DBSession.commit() |
|
|
|
self.DBSession.commit() |
|
|
|
|
|
|
|
if new_policies: |
|
|
|
|
|
|
|
self.print_error('last timestamp:', datetime.fromtimestamp(self._get_last_timestamp()).ctime()) |
|
|
|
|
|
|
|
|
|
|
|
@sql |
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
#@profiler |
|
|
|
def on_node_announcement(self, msg_payloads): |
|
|
|
def on_node_announcement(self, msg_payloads): |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
@ -414,7 +419,7 @@ class ChannelDB(SqlDB): |
|
|
|
new_nodes[node_id] = node_info |
|
|
|
new_nodes[node_id] = node_info |
|
|
|
for addr in node_addresses: |
|
|
|
for addr in node_addresses: |
|
|
|
new_addresses[(addr.node_id,addr.host,addr.port)] = addr |
|
|
|
new_addresses[(addr.node_id,addr.host,addr.port)] = addr |
|
|
|
self.print_error("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) |
|
|
|
#self.print_error("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) |
|
|
|
for node_info in new_nodes.values(): |
|
|
|
for node_info in new_nodes.values(): |
|
|
|
self.DBSession.add(node_info) |
|
|
|
self.DBSession.add(node_info) |
|
|
|
for new_addr in new_addresses.values(): |
|
|
|
for new_addr in new_addresses.values(): |
|
|
|
|