11#!/usr/bin/env python3
22from inotify import constants
33from inotify .adapters import Inotify
4+ import os
45from pyln .client import Plugin
6+ import pika
57from sqlalchemy import create_engine
68from sqlalchemy import desc
79from sqlalchemy .orm import sessionmaker
@@ -139,28 +141,117 @@ def tail(self):
139141 continue
140142
141143
144+ def encode_varint (value ):
145+ """Encode a varint value"""
146+ result = bytearray ()
147+ while value >= 128 :
148+ result .append ((value & 0x7F ) | 0x80 )
149+ value >>= 7
150+ result .append (value )
151+ return bytes (result )
152+
153+
154+ def field_prefix (index : int , wire_type : int ) -> bytes :
155+ """The T part of the TLV for protobuf encoded fields.
156+ Bits 0-2 are the type, while greater bits are the varint encoded field index.
157+ 0 VARINT int32, int64, uint32, uint64, sint32, sint64, bool, enum
158+ 1 I64 fixed64, sfixed64, double
159+ 2 LEN string, bytes, embedded messages, packed repeated fields
160+ 3 SGROUP group start (deprecated)
161+ 4 EGROUP group end (deprecated)
162+ 5 I32 fixed32, sfixed32, float"""
163+ return encode_varint (index << 3 | wire_type )
164+
165+
166+ def length_delimited (data : bytes ) -> bytes :
167+ """The LV part of the TLV for protobuf encoded fields."""
168+ if not data :
169+ return b'\x00 '
170+ return encode_varint (len (data )) + data
171+
172+
173+ def serialize (msg : bytes , node_id : str , network : str ) -> bytes :
174+ # from GL proto/internal.proto:
175+ # message GossipMessage {
176+ # // The raw message as seen on the wire.
177+ # bytes raw = 1;
178+ #
179+ # // For private messages such as local addition of a channel we
180+ # // want to restrict to the node that originated the message.
181+ # bytes node_id = 2;
182+ #
183+ # // Which network was the client configured to follow?
184+ # Network network = 3;
185+ #
186+ # // Which peer of the node sent this message?
187+ # bytes peer_id = 4;
188+ # }
189+ network_encoding = {"bitcoin" : 0 , "testnet" : 1 , "regtest" : 2 , "signet" : 3 }
190+ if network in network_encoding :
191+ active_network = network_encoding [network ]
192+ else :
193+ active_network = 2
194+ output = bytearray ()
195+ output .extend (field_prefix (1 , 2 )) # raw message tag
196+ output .extend (length_delimited (msg )) # raw msg field
197+ output .extend (field_prefix (2 , 2 )) # node_id tag
198+ output .extend (length_delimited (None )) # leave this empty - all public.
199+ output .extend (field_prefix (3 , 0 )) # network in an enum
200+ output .extend (length_delimited (active_network .to_bytes ())) # network field
201+ output .extend (field_prefix (4 , 2 )) # peer_id tag
202+ if node_id :
203+ # Add our node_id if we have it (so we know who to blame.)
204+ output .extend (length_delimited (node_id .encode ("utf-8" )))
205+ else :
206+ output .extend (length_delimited (None )) # our node id not available
207+
208+ return output
209+
210+
142211class Flusher (Thread ):
143212 def __init__ (self , engine ):
144213 Thread .__init__ (self )
145214 self .engine = engine
146215 self .session_maker = sessionmaker (bind = engine )
147216 self .session = None
217+ self .RABBITMQ_URL = os .environ .get ("RABBITMQ_URL" )
218+ self .connection = None
219+ my_info = plugin .rpc .getinfo ()
220+ if "id" in my_info :
221+ self .node_id = my_info ["id" ]
222+ else :
223+ self .node_id = None
224+ if "network" in my_info :
225+ self .network = my_info ["network" ]
226+ else :
227+ self .network = None
228+
229+ def rabbitmq_connect (self ):
230+ params = pika .URLParameters (self .RABBITMQ_URL )
231+ self .connection = pika .BlockingConnection (params ) # default, localhost
232+ self .channel = self .connection .channel ()
233+ plugin .log (f"message queue connected to { params .host } :{ params .port } " )
148234
149235 def run (self ):
150236 logging .info ("Starting flusher" )
151237 ft = FileTailer ('gossip_store' )
152238 last_flush = time .time ()
239+ total = 0
153240
154241 self .session = self .session_maker ()
155242 for i , e in enumerate (ft .tail ()):
156243 self .store (e )
244+ self .publish (e )
157245
158246 if last_flush < time .time () - 10 :
159247 self .session .commit ()
160248 self .session = self .session_maker ()
161249 last_flush = time .time ()
162250
163- logging .warn ("Filetailer exited..." )
251+ plugin .log ("Filetailer exited..." , level = "warn" )
252+ if self .connection :
253+ self .connection .close ()
254+ plugin .log ("Rabbitmq connection closed." , level = "warn" )
164255
165256 def store (self , raw : bytes ) -> None :
166257 try :
@@ -180,7 +271,39 @@ def store(self, raw: bytes) -> None:
180271
181272 self .session .merge (cls .from_gossip (msg , raw ))
182273 except Exception as e :
183- logging .warn (f"Exception parsing gossip message: { e } " )
274+ logging .warning (f"Exception parsing gossip message: { e } " )
275+
276+ def publish (self , raw : bytes ) -> None :
277+ """Serialize and publish a gossip message to a rabbitmq exchange."""
278+ if not self .RABBITMQ_URL :
279+ return
280+
281+ try :
282+ msg = gossipd .parse (raw )
283+ if msg is None :
284+ return
285+ except Exception as e :
286+ logging .warning (f"Could not parse gossip message: { e } " )
287+ return
288+
289+ if not self .connection or not self .connection .is_open :
290+ try :
291+ plugin .log (f"connecting to message queue" )
292+ self .rabbitmq_connect ()
293+ except :
294+ raise Exception ("rabbitmq connection closed" )
295+
296+ for msg_type in [gossipd .ChannelUpdate ,
297+ gossipd .ChannelAnnouncement ,
298+ gossipd .NodeAnnouncement ]:
299+ if isinstance (msg , msg_type ):
300+ self .channel .basic_publish (exchange = 'router.gossip' ,
301+ # unused by fanout exchange
302+ routing_key = '' ,
303+ body = serialize (raw , self .node_id ,
304+ self .network ))
305+ return
306+
184307
185308
186309@plugin .init ()
0 commit comments