diff --git a/connectd/connectd.c b/connectd/connectd.c index 93fea082c98e..44f81b6c2518 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -2401,6 +2401,10 @@ static struct io_plan *recv_req(struct io_conn *conn, add_scid_map(daemon, msg); goto out; + case WIRE_CONNECTD_CUSTOMMSG_IN_COMPLETE: + custommsg_completed(daemon, msg); + goto out; + case WIRE_CONNECTD_DEV_MEMLEAK: if (daemon->developer) { dev_connect_memleak(daemon, msg); diff --git a/connectd/connectd_wire.csv b/connectd/connectd_wire.csv index 4b4d54577265..7b14915915ab 100644 --- a/connectd/connectd_wire.csv +++ b/connectd/connectd_wire.csv @@ -194,6 +194,10 @@ msgdata,connectd_custommsg_in,id,node_id, msgdata,connectd_custommsg_in,msg_len,u16, msgdata,connectd_custommsg_in,msg,u8,msg_len +# We got that custommsg, thankyou, it was delightful. +msgtype,connectd_custommsg_in_complete,2111 +msgdata,connectd_custommsg_in_complete,id,node_id, + # A custom message that the lightningd tells us to send to the peer. msgtype,connectd_custommsg_out,2011 msgdata,connectd_custommsg_out,id,node_id, diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 1cbc98f1e90e..a59b7ad33476 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -732,6 +732,14 @@ static void handle_pong_in(struct peer *peer, const u8 *msg) abort(); } +/* Various cases where we don't send the msg to a gossipd, we want to + * do IO logging! */ +static void log_peer_io(const struct peer *peer, const u8 *msg) +{ + status_peer_io(LOG_IO_IN, &peer->id, msg); + io_wake(peer->peer_outq); +} + /* Forward to gossipd */ static void handle_gossip_in(struct peer *peer, const u8 *msg) { @@ -744,7 +752,7 @@ static void handle_gossip_in(struct peer *peer, const u8 *msg) gmsg = towire_gossipd_recv_gossip(NULL, &peer->id, msg); /* gossipd doesn't log IO, so we log it here. */ - status_peer_io(LOG_IO_IN, &peer->id, msg); + log_peer_io(peer, msg); daemon_conn_send(peer->daemon->gossipd, take(gmsg)); } @@ -848,6 +856,22 @@ static bool handle_custommsg(struct daemon *daemon, return true; } +void custommsg_completed(struct daemon *daemon, const u8 *msg) +{ + struct node_id id; + const struct peer *peer; + + if (!fromwire_connectd_custommsg_in_complete(msg, &id)) + master_badmsg(WIRE_CONNECTD_CUSTOMMSG_IN_COMPLETE, msg); + + /* If it's still around, log it. */ + peer = peer_htable_get(daemon->peers, &id); + if (peer) { + status_peer_debug(&peer->id, "custommsg processing finished"); + log_peer_io(peer, msg); + } +} + /* We handle pings and gossip messages. */ static bool handle_message_locally(struct peer *peer, const u8 *msg) { @@ -857,19 +881,19 @@ static bool handle_message_locally(struct peer *peer, const u8 *msg) gossip_rcvd_filter_add(peer->gs.grf, msg); if (type == WIRE_GOSSIP_TIMESTAMP_FILTER) { - status_peer_io(LOG_IO_IN, &peer->id, msg); + log_peer_io(peer, msg); handle_gossip_timestamp_filter_in(peer, msg); return true; } else if (type == WIRE_PING) { - status_peer_io(LOG_IO_IN, &peer->id, msg); + log_peer_io(peer, msg); handle_ping_in(peer, msg); return true; } else if (type == WIRE_PONG) { - status_peer_io(LOG_IO_IN, &peer->id, msg); + log_peer_io(peer, msg); handle_pong_in(peer, msg); return true; } else if (type == WIRE_ONION_MESSAGE) { - status_peer_io(LOG_IO_IN, &peer->id, msg); + log_peer_io(peer, msg); handle_onion_message(peer->daemon, peer, msg); return true; } else if (type == WIRE_QUERY_CHANNEL_RANGE) { @@ -1267,7 +1291,6 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, /* If we swallow this, just try again. */ if (handle_message_locally(peer, decrypted)) { /* Make sure to update peer->peer_in_lastmsg so we blame correct msg! */ - io_wake(peer->peer_outq); goto out; } diff --git a/connectd/multiplex.h b/connectd/multiplex.h index 1a20143a6764..f42f7cb76ea2 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -28,6 +28,9 @@ void send_manual_ping(struct daemon *daemon, const u8 *msg); /* When lightningd says to send a custom message (from a plugin) */ void send_custommsg(struct daemon *daemon, const u8 *msg); +/* lightningd has finished with the custommsg */ +void custommsg_completed(struct daemon *daemon, const u8 *msg); + /* When lightningd says what custom messages we can recv */ void set_custommsgs(struct daemon *daemon, const u8 *msg); diff --git a/lightningd/connect_control.c b/lightningd/connect_control.c index ae0bde7a378e..09da14aa8948 100644 --- a/lightningd/connect_control.c +++ b/lightningd/connect_control.c @@ -351,6 +351,7 @@ void connect_succeeded(struct lightningd *ld, const struct peer *peer, } struct custommsg_payload { + struct lightningd *ld; struct node_id peer_id; u8 *msg; }; @@ -376,6 +377,11 @@ static bool custommsg_cb(struct custommsg_payload *payload, static void custommsg_final(struct custommsg_payload *payload STEALS) { + /* Note: on shutdown, ld->connectd can be NULL! */ + if (payload->ld->connectd) { + subd_send_msg(payload->ld->connectd, + take(towire_connectd_custommsg_in_complete(NULL, &payload->peer_id))); + } tal_steal(tmpctx, payload); } @@ -397,6 +403,7 @@ static void handle_custommsg_in(struct lightningd *ld, const u8 *msg) { struct custommsg_payload *p = tal(NULL, struct custommsg_payload); + p->ld = ld; if (!fromwire_connectd_custommsg_in(p, msg, &p->peer_id, &p->msg)) { log_broken(ld->log, "Malformed custommsg: %s", tal_hex(tmpctx, msg)); @@ -550,6 +557,7 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd case WIRE_CONNECTD_DEV_EXHAUST_FDS: case WIRE_CONNECTD_DEV_SET_MAX_SCIDS_ENCODE_SIZE: case WIRE_CONNECTD_SCID_MAP: + case WIRE_CONNECTD_CUSTOMMSG_IN_COMPLETE: /* This is a reply, so never gets through to here. */ case WIRE_CONNECTD_INIT_REPLY: case WIRE_CONNECTD_ACTIVATE_REPLY: diff --git a/lightningd/peer_control.c b/lightningd/peer_control.c index 2751769c2421..2ec8bea3689f 100644 --- a/lightningd/peer_control.c +++ b/lightningd/peer_control.c @@ -1322,13 +1322,11 @@ peer_connected_serialize(struct peer_connected_hook_payload *payload, json_object_start(stream, "peer"); json_add_node_id(stream, "id", &payload->peer_id); json_add_string(stream, "direction", payload->incoming ? "in" : "out"); - json_add_string( - stream, "addr", - fmt_wireaddr_internal(stream, &payload->addr)); + json_add_string(stream, "addr", + fmt_wireaddr_internal(tmpctx, &payload->addr)); if (payload->remote_addr) - json_add_string( - stream, "remote_addr", - fmt_wireaddr(stream, payload->remote_addr)); + json_add_string(stream, "remote_addr", + fmt_wireaddr(tmpctx, payload->remote_addr)); /* Since this is start of hook, peer is always in table! */ json_add_hex_talarr(stream, "features", peer_by_id(payload->ld, &payload->peer_id) diff --git a/tests/test_bookkeeper.py b/tests/test_bookkeeper.py index 5160261e40f1..d2a2e71457bb 100644 --- a/tests/test_bookkeeper.py +++ b/tests/test_bookkeeper.py @@ -23,10 +23,10 @@ def find_first_tag(evs, tag): return ev[0] -def check_events(node, channel_id, exp_events): +def check_events(node, channel_id, exp_events, alt_events=None): chan_events = [ev for ev in node.rpc.bkpr_listaccountevents()['events'] if ev['account'] == channel_id] stripped = [{k: d[k] for k in ('tag', 'credit_msat', 'debit_msat') if k in d} for d in chan_events] - assert stripped == exp_events + assert stripped == exp_events or stripped == alt_events @unittest.skipIf(TEST_NETWORK != 'regtest', "fixme: broadcast fails, dusty") @@ -399,7 +399,12 @@ def test_bookkeeping_missed_chans_pushed(node_factory, bitcoind): {'tag': 'pushed', 'credit_msat': 0, 'debit_msat': push_amt}, {'tag': 'onchain_fee', 'credit_msat': 4927000, 'debit_msat': 0}, {'tag': 'invoice', 'credit_msat': 0, 'debit_msat': invoice_msat}] - check_events(l1, channel_id, exp_events) + # We sometimes see onchain_fee first: + alt_events = [{'tag': 'channel_open', 'credit_msat': open_amt * 1000, 'debit_msat': 0}, + {'tag': 'onchain_fee', 'credit_msat': 4927000, 'debit_msat': 0}, + {'tag': 'pushed', 'credit_msat': 0, 'debit_msat': push_amt}, + {'tag': 'invoice', 'credit_msat': 0, 'debit_msat': invoice_msat}] + check_events(l1, channel_id, exp_events, alt_events) # l2 events exp_events = [{'tag': 'channel_open', 'credit_msat': 0, 'debit_msat': 0}, diff --git a/tests/test_closing.py b/tests/test_closing.py index e64295dc8065..0cc1c2b68a85 100644 --- a/tests/test_closing.py +++ b/tests/test_closing.py @@ -871,7 +871,8 @@ def test_channel_lease_post_expiry(node_factory, bitcoind, chainparams): bitcoind.generate_block(6) sync_blockheight(bitcoind, [l1, l2]) # make sure we're at the right place for the csv lock - l2.daemon.wait_for_log('Blockheight: SENT_ADD_ACK_COMMIT->RCVD_ADD_ACK_REVOCATION LOCAL now 115') + height = bitcoind.rpc.getblockchaininfo()['blocks'] + l2.daemon.wait_for_log(f'Blockheight: SENT_ADD_ACK_COMMIT->RCVD_ADD_ACK_REVOCATION LOCAL now {height}') # We need to give l1-l2 time to update their blockheights for i in range(0, 4000, 1000): @@ -980,7 +981,8 @@ def test_channel_lease_unilat_closes(node_factory, bitcoind): bitcoind.generate_block(2) sync_blockheight(bitcoind, [l1, l2, l3]) # make sure we're at the right place for the csv lock - l2.daemon.wait_for_log('Blockheight: SENT_ADD_ACK_COMMIT->RCVD_ADD_ACK_REVOCATION LOCAL now 110') + height = bitcoind.rpc.getblockchaininfo()['blocks'] + l2.daemon.wait_for_log(f'Blockheight: SENT_ADD_ACK_COMMIT->RCVD_ADD_ACK_REVOCATION LOCAL now {height}') l2.stop() # unilateral close channels l1<->l2 & l3<->l2 diff --git a/tests/test_connection.py b/tests/test_connection.py index 1d3ea7d1ad48..20fa6259fbae 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -4496,9 +4496,15 @@ def test_connect_ratelimit(node_factory, bitcoind): assert not l1.daemon.is_in_log('Unblocking for') l1.stop() - # Suspend the others, to make sure they cannot respond too fast. + + # Suspend the others' connectd, to make sure they cannot respond too fast. + connectd_pids = [] for n in nodes: - os.kill(n.daemon.proc.pid, signal.SIGSTOP) + log = n.daemon.is_in_log(' connectd: pid .*, msgfd') + m = re.search(r'connectd: pid (\d*),', log) + pid = int(m.groups()[0]) + connectd_pids.append(pid) + os.kill(pid, signal.SIGSTOP) try: l1.start() @@ -4509,13 +4515,13 @@ def test_connect_ratelimit(node_factory, bitcoind): * (len(nodes) - 1)) except Exception as err: # Resume, so pytest doesn't hang! - for n in nodes: - os.kill(n.daemon.proc.pid, signal.SIGCONT) + for p in connectd_pids: + os.kill(p, signal.SIGCONT) raise err # Resume them - for n in nodes: - os.kill(n.daemon.proc.pid, signal.SIGCONT) + for p in connectd_pids: + os.kill(p, signal.SIGCONT) # And now they're all connected wait_for(lambda: [p['connected'] for p in l1.rpc.listpeers()['peers']] == [True] * len(nodes)) diff --git a/tests/test_misc.py b/tests/test_misc.py index 20705782ded3..5baebbcdce42 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -4719,6 +4719,8 @@ def test_even_sendcustommsg(node_factory): l1.rpc.sendcustommsg(l2.info['id'], msg) l2.daemon.wait_for_log(r'\[IN\] {}'.format(msg)) l2.daemon.wait_for_log(r'allow_even_msgs.*Got message 43690') + # Make sure it *processes* before we remove plugin. + l2.daemon.wait_for_log(f"{l1.info['id']}-connectd: custommsg processing finished") # And nobody gets upset assert only_one(l1.rpc.listpeers(l2.info['id'])['peers'])['connected'] diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 3f862c2cd80d..2e18c2d9bace 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4021,6 +4021,9 @@ def test_sql(node_factory, bitcoind): # Make sure l3 sees new channel wait_for(lambda: len(l3.rpc.listchannels(scid)['channels']) == 2) + # Make sure we have a node_announcement for l1 + wait_for(lambda: l2.rpc.listnodes(l1.info['id'])['nodes'] != []) + # This should create a forward through l2 l1.rpc.pay(l3.rpc.invoice(amount_msat=12300, label='inv1', description='description')['bolt11']) diff --git a/tests/test_renepay.py b/tests/test_renepay.py index 047d4a4d6b2f..65735426b0db 100644 --- a/tests/test_renepay.py +++ b/tests/test_renepay.py @@ -407,6 +407,7 @@ def test_hardmpp(node_factory): assert invoice["amount_received_msat"] >= Millisatoshi("1800000sat") +@pytest.mark.flaky(reruns=2) def test_self_pay(node_factory): l1, l2 = node_factory.line_graph(2, wait_for_announce=True)