diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index 0b289db4d..b8a13f912 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -36,6 +36,25 @@ namespace libp2p { } // namespace libp2p namespace libp2p::protocol::gossip { + /// RPC limits to control message processing + struct RPCLimits { + /// Maximum subscriptions that will be processed in a single message and the + /// rest will be ignored + size_t max_subscriptions = 5000; + + /// Maximum messages that will be processed in a single message and the rest + /// will be ignored + size_t max_ihave_messages = 5000; + size_t max_iwant_messages = 5000; + size_t max_graft_messages = 5000; + size_t max_prune_messages = 5000; + + /// Maximum message ids that will be processed in a single message and the + /// rest will be ignored + size_t max_ihave_message_ids = 5000; + size_t max_iwant_message_ids = 5000; + size_t max_prune_peer_infos = 16; + }; /// Gossip pub-sub protocol config struct Config { @@ -91,6 +110,9 @@ namespace libp2p::protocol::gossip { /// Sign published messages bool sign_messages = false; + + /// RPC Parsing limits + std::shared_ptr rpc_limits = std::make_shared(); }; using TopicId = std::string; diff --git a/src/protocol/gossip/impl/message_parser.cpp b/src/protocol/gossip/impl/message_parser.cpp index 15fc22924..8c214c3b2 100644 --- a/src/protocol/gossip/impl/message_parser.cpp +++ b/src/protocol/gossip/impl/message_parser.cpp @@ -12,6 +12,8 @@ #include +#include "peer_context.hpp" + namespace libp2p::protocol::gossip { namespace { @@ -23,7 +25,8 @@ namespace libp2p::protocol::gossip { // need to define default ctor/dtor here in translation unit due to unique_ptr // to type which is incomplete in header - MessageParser::MessageParser() = default; + MessageParser::MessageParser(std::shared_ptr limits) + : limits_(std::move(limits)) {} MessageParser::~MessageParser() = default; bool MessageParser::parse(BytesIn bytes) { @@ -42,49 +45,91 @@ namespace libp2p::protocol::gossip { return; } + size_t curr_subscriptions = 0; + for (const auto &s : pb_msg_->subscriptions()) { if (!s.has_subscribe() || !s.has_topicid()) { continue; } + + if (curr_subscriptions == limits_->max_subscriptions) { + break; + } receiver.onSubscription(from, s.subscribe(), s.topicid()); + + curr_subscriptions++; } if (pb_msg_->has_control()) { const auto &c = pb_msg_->control(); + size_t curr_ihave_messages = 0; + size_t curr_iwant_messages = 0; + size_t curr_graft_messages = 0; + size_t curr_prune_messages = 0; for (const auto &h : c.ihave()) { + if (curr_ihave_messages == limits_->max_ihave_messages) { + break; + } + size_t curr_ihave_message_ids = 0; if (!h.has_topicid() || h.messageids_size() == 0) { continue; } const TopicId &topic = h.topicid(); for (const auto &msg_id : h.messageids()) { + if (curr_ihave_message_ids == limits_->max_ihave_message_ids) { + break; + } if (msg_id.empty()) { continue; } receiver.onIHave(from, topic, fromString(msg_id)); + + curr_ihave_message_ids++; } + + curr_ihave_messages++; } for (const auto &w : c.iwant()) { + if (curr_iwant_messages == limits_->max_iwant_messages) { + break; + } + size_t curr_iwant_message_ids = 0; if (w.messageids_size() == 0) { continue; } for (const auto &msg_id : w.messageids()) { + if (curr_iwant_message_ids == limits_->max_iwant_message_ids) { + break; + } if (msg_id.empty()) { continue; } receiver.onIWant(from, fromString(msg_id)); + + curr_iwant_message_ids++; } + curr_iwant_messages++; } for (const auto &gr : c.graft()) { + if (curr_graft_messages == limits_->max_graft_messages) { + break; + } if (!gr.has_topicid()) { continue; } receiver.onGraft(from, gr.topicid()); + + curr_graft_messages++; } for (const auto &pr : c.prune()) { + if (curr_prune_messages == limits_->max_prune_messages) { + break; + } + size_t curr_prune_peer_infos = 0; if (!pr.has_topicid()) { continue; } @@ -95,13 +140,20 @@ namespace libp2p::protocol::gossip { log()->debug( "prune backoff={}, {} peers", backoff_time, pr.peers_size()); for (const auto &peer : pr.peers()) { + if (curr_prune_peer_infos == limits_->max_prune_peer_infos) { + break; + } // TODO(artem): meshsub 1.1.0 + signed peer records NYI log()->debug("peer id size={}, signed peer record size={}", peer.peerid().size(), peer.signedpeerrecord().size()); + + curr_prune_peer_infos++; } receiver.onPrune(from, pr.topicid(), backoff_time); + + curr_prune_messages++; } } diff --git a/src/protocol/gossip/impl/message_parser.hpp b/src/protocol/gossip/impl/message_parser.hpp index 6523004c5..d43ab0e86 100644 --- a/src/protocol/gossip/impl/message_parser.hpp +++ b/src/protocol/gossip/impl/message_parser.hpp @@ -8,6 +8,8 @@ #include "common.hpp" +#include + namespace pubsub::pb { // protobuf message forward declaration class RPC; @@ -20,7 +22,7 @@ namespace libp2p::protocol::gossip { /// Protobuf message parser. class MessageParser { public: - MessageParser(); + MessageParser(std::shared_ptr limits); ~MessageParser(); @@ -33,6 +35,8 @@ namespace libp2p::protocol::gossip { private: /// Parsed protobuf message std::unique_ptr pb_msg_; + /// Initialised RPC Limits + std::shared_ptr limits_; }; } // namespace libp2p::protocol::gossip diff --git a/src/protocol/gossip/impl/stream.cpp b/src/protocol/gossip/impl/stream.cpp index 2b18a9f47..90ada166e 100644 --- a/src/protocol/gossip/impl/stream.cpp +++ b/src/protocol/gossip/impl/stream.cpp @@ -28,6 +28,7 @@ namespace libp2p::protocol::gossip { std::shared_ptr stream, PeerContextPtr peer) : stream_id_(stream_id), + config_{config}, timeout_(config.rw_timeout_msec), scheduler_(scheduler), max_message_size_(config.max_message_size), @@ -109,7 +110,7 @@ namespace libp2p::protocol::gossip { peer_->str, stream_id_); - MessageParser parser; + MessageParser parser{config_.rpc_limits}; if (!parser.parse(*read_buffer_)) { feedback_(peer_, Error::MESSAGE_PARSE_ERROR); return; diff --git a/src/protocol/gossip/impl/stream.hpp b/src/protocol/gossip/impl/stream.hpp index cc5b317be..a04915f95 100644 --- a/src/protocol/gossip/impl/stream.hpp +++ b/src/protocol/gossip/impl/stream.hpp @@ -57,6 +57,7 @@ namespace libp2p::protocol::gossip { void asyncPostError(Error error); [[maybe_unused]] const size_t stream_id_; + const Config &config_; const Time timeout_; basic::Scheduler &scheduler_; const size_t max_message_size_; diff --git a/test/libp2p/protocol/gossip/CMakeLists.txt b/test/libp2p/protocol/gossip/CMakeLists.txt index dcd6aba15..4b0db7854 100644 --- a/test/libp2p/protocol/gossip/CMakeLists.txt +++ b/test/libp2p/protocol/gossip/CMakeLists.txt @@ -20,3 +20,11 @@ target_link_libraries(gossip_local_subs_test p2p_gossip p2p_testutil_peer ) + +addtest(gossip_rpc_limits_test + gossip_rpc_limits_test.cpp + ) +target_link_libraries(gossip_rpc_limits_test + p2p_gossip + p2p_testutil_peer + ) diff --git a/test/libp2p/protocol/gossip/gossip_rpc_limits_test.cpp b/test/libp2p/protocol/gossip/gossip_rpc_limits_test.cpp new file mode 100644 index 000000000..6658bfce8 --- /dev/null +++ b/test/libp2p/protocol/gossip/gossip_rpc_limits_test.cpp @@ -0,0 +1,259 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "protocol/gossip/protobuf/rpc.pb.h" +#include "src/protocol/gossip/impl/common.hpp" +#include "src/protocol/gossip/impl/message_parser.hpp" +#include "src/protocol/gossip/impl/message_receiver.hpp" +#include "src/protocol/gossip/impl/peer_context.hpp" +#include "testutil/libp2p/peer.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace g = libp2p::protocol::gossip; + +namespace { + class TestMessageReceiver : public g::MessageReceiver { + public: + ~TestMessageReceiver() = default; + + void onSubscription(const g::PeerContextPtr &from, + bool subscribe, + const g::TopicId &topic) { + subscriptions_processed++; + } + + void onIHave(const g::PeerContextPtr &from, + const g::TopicId &topic, + const g::MessageId &msg_id) { + if (!ihave_topicIds_processed.contains(topic)) { + ihave_topicIds_processed.insert(topic); + ihave_messages_processed++; + } + ihave_message_ids_processed[topic]++; + } + + void onIWant(const g::PeerContextPtr &from, const g::MessageId &msg_id) { + total_iwant_message_Ids_processed++; + } + + void onGraft(const g::PeerContextPtr &from, const g::TopicId &topic) { + graft_processed++; + } + + void onPrune(const g::PeerContextPtr &from, + const g::TopicId &topic, + uint64_t backoff_time) { + prune_processed++; + } + + void onTopicMessage(const g::PeerContextPtr &from, + g::TopicMessage::Ptr msg) {} + + void onMessageEnd(const g::PeerContextPtr &from) {} + + size_t subscriptions_processed = 0; + + /// IHAVE Control messages can have multiple message ids and each control + /// message group can be uniquely identified using a topic Id + size_t ihave_messages_processed = 0; + std::set ihave_topicIds_processed{}; + std::map ihave_message_ids_processed{}; + + /// IWANT Control messages can have multiple message ids but each control + /// message group cannot be uniquely identified + size_t total_iwant_message_Ids_processed = 0; + + size_t graft_processed = 0; + + /// NOTE: Add Peer Info count after MessageParser Update + /// PRUNE messages can have multiple peer infos, but it's currently not + /// implemented in the parser. + size_t prune_processed = 0; + }; + + void serializeAndDispatch(pubsub::pb::RPC &rpc, + g::MessageParser &parser, + TestMessageReceiver &receiver) { + std::string serialized; + rpc.SerializeToString(&serialized); + libp2p::BytesIn pubsub_message{ + reinterpret_cast(serialized.data()), + serialized.size()}; + parser.parse(pubsub_message); + const g::PeerContextPtr mock_context_ptr = + std::make_shared(testutil::randomPeerId()); + parser.dispatch(mock_context_ptr, receiver); + } +} // namespace + +/** + * @given Limit on subscriptions in RPC + * @when we parse the RPC message + * @then subscriptions after the limit has been reached are ignored + */ +TEST(Gossip, RPCSubscriptionLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_subscriptions = rand() % 10; + + pubsub::pb::RPC rpc; + for (int i = 0; i < 100; i++) { + auto *sub = rpc.add_subscriptions(); + sub->set_subscribe(i % 2); + sub->set_topicid(std::to_string(i)); + } + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + ASSERT_EQ(receiver.subscriptions_processed, limits.max_subscriptions); +} + +/** + * @given Limit on IHAVE control messages and IHAVE message ids on RPC + * @when we parse the RPC message + * @then messages and message ids after the limit are ignored + */ +TEST(Gossip, RPCIHaveLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_ihave_messages = rand() % 10; + limits.max_ihave_message_ids = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *ihave = control->add_ihave(); + ihave->set_topicid(std::to_string(i)); + for (int j = 0; j < 100; j++) { + ihave->add_messageids(std::to_string(j)); + } + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + ASSERT_EQ(receiver.ihave_messages_processed, limits.max_ihave_messages); + for (size_t i = 0; i < limits.max_ihave_messages; i++) { + ASSERT_EQ(receiver.ihave_message_ids_processed[std::to_string(i)], + limits.max_ihave_message_ids); + } + for (int i = limits.max_ihave_messages; i < 100; i++) { + ASSERT_EQ(receiver.ihave_message_ids_processed[std::to_string(i)], 0); + } +} + +/** + * @given Limit on IWANT control messages and IWANT message ids on RPC + * @when we parse the RPC message + * @then messages and message ids after the limit are ignored + */ +TEST(Gossip, RPCIWantLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_iwant_messages = rand() % 10; + limits.max_iwant_message_ids = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *iwant = control->add_iwant(); + for (int j = 0; j < 100; j++) { + iwant->add_messageids(std::to_string(j)); + } + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + ASSERT_EQ(receiver.total_iwant_message_Ids_processed, + limits.max_iwant_messages * limits.max_iwant_message_ids); +} + +/** + * @given Limit on GRAFT control messages on RPC + * @when we parse the RPC message + * @then messages after the limit are ignored + */ +TEST(Gossip, RPCGraftLimit) { + srand(0); + g::RPCLimits limits{}; + limits.max_graft_messages = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *graft = control->add_graft(); + graft->set_topicid(std::to_string(i)); + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + + ASSERT_EQ(receiver.graft_processed, limits.max_graft_messages); +} + +/** + * @given Limit on PRUNE control messages on RPC + * @when we parse the RPC message + * @then messages after the limit are ignored + */ +TEST(Gossip, RPCPruneLimit) { + // Logging system is required as PRUNE control message parsing logs + const std::string logger_config(R"( +# ---------------- +sinks: + - name: console + type: console + color: true +groups: + - name: main + sink: console + level: debug + children: + - name: libp2p +# ---------------- + )"); + + auto logging_system = std::make_shared( + std::make_shared( + // Original LibP2P logging config + std::make_shared(), + // Additional logging config for application + logger_config)); + auto result = logging_system->configure(); + libp2p::log::setLoggingSystem(logging_system); + + srand(0); + g::RPCLimits limits{}; + limits.max_prune_messages = rand() % 10; + + pubsub::pb::RPC rpc; + auto *control = rpc.mutable_control(); + for (int i = 0; i < 100; i++) { + auto *prune = control->add_prune(); + prune->set_topicid(std::to_string(i)); + } + + g::MessageParser parser{std::make_shared(limits)}; + TestMessageReceiver receiver; + serializeAndDispatch(rpc, parser, receiver); + + ASSERT_EQ(receiver.prune_processed, limits.max_prune_messages); +}