Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions include/libp2p/protocol/gossip/gossip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ namespace libp2p {
} // namespace libp2p

namespace libp2p::protocol::gossip {
/// RPC limits to control message processing
struct RPCLimits {
/// Maximum subscriptions that will be processed in a single message and the
/// rest will be ignored
size_t max_subscriptions = 5000;

/// Maximum messages that will be processed in a single message and the rest
/// will be ignored
size_t max_ihave_messages = 5000;
size_t max_iwant_messages = 5000;
size_t max_graft_messages = 5000;
size_t max_prune_messages = 5000;

/// Maximum message ids that will be processed in a single message and the
/// rest will be ignored
size_t max_ihave_message_ids = 5000;
size_t max_iwant_message_ids = 5000;
size_t max_prune_peer_infos = 16;
};

/// Gossip pub-sub protocol config
struct Config {
Expand Down Expand Up @@ -91,6 +110,9 @@ namespace libp2p::protocol::gossip {

/// Sign published messages
bool sign_messages = false;

/// RPC Parsing limits
std::shared_ptr<RPCLimits> rpc_limits = std::make_shared<RPCLimits>();
};

using TopicId = std::string;
Expand Down
54 changes: 53 additions & 1 deletion src/protocol/gossip/impl/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include <generated/protocol/gossip/protobuf/rpc.pb.h>

#include "peer_context.hpp"

namespace libp2p::protocol::gossip {

namespace {
Expand All @@ -23,7 +25,8 @@ namespace libp2p::protocol::gossip {

// need to define default ctor/dtor here in translation unit due to unique_ptr
// to type which is incomplete in header
MessageParser::MessageParser() = default;
MessageParser::MessageParser(std::shared_ptr<RPCLimits> limits)
: limits_(std::move(limits)) {}
MessageParser::~MessageParser() = default;

bool MessageParser::parse(BytesIn bytes) {
Expand All @@ -42,49 +45,91 @@ namespace libp2p::protocol::gossip {
return;
}

size_t curr_subscriptions = 0;

for (const auto &s : pb_msg_->subscriptions()) {
if (!s.has_subscribe() || !s.has_topicid()) {
continue;
}

if (curr_subscriptions == limits_->max_subscriptions) {
break;
}
receiver.onSubscription(from, s.subscribe(), s.topicid());

curr_subscriptions++;
}

if (pb_msg_->has_control()) {
const auto &c = pb_msg_->control();
size_t curr_ihave_messages = 0;
size_t curr_iwant_messages = 0;
size_t curr_graft_messages = 0;
size_t curr_prune_messages = 0;

for (const auto &h : c.ihave()) {
if (curr_ihave_messages == limits_->max_ihave_messages) {
break;
}
size_t curr_ihave_message_ids = 0;
if (!h.has_topicid() || h.messageids_size() == 0) {
continue;
}
const TopicId &topic = h.topicid();
for (const auto &msg_id : h.messageids()) {
if (curr_ihave_message_ids == limits_->max_ihave_message_ids) {
break;
}
if (msg_id.empty()) {
continue;
}
receiver.onIHave(from, topic, fromString(msg_id));

curr_ihave_message_ids++;
}

curr_ihave_messages++;
}

for (const auto &w : c.iwant()) {
if (curr_iwant_messages == limits_->max_iwant_messages) {
break;
}
size_t curr_iwant_message_ids = 0;
if (w.messageids_size() == 0) {
continue;
}
for (const auto &msg_id : w.messageids()) {
if (curr_iwant_message_ids == limits_->max_iwant_message_ids) {
break;
}
if (msg_id.empty()) {
continue;
}
receiver.onIWant(from, fromString(msg_id));

curr_iwant_message_ids++;
}
curr_iwant_messages++;
}

for (const auto &gr : c.graft()) {
if (curr_graft_messages == limits_->max_graft_messages) {
break;
}
if (!gr.has_topicid()) {
continue;
}
receiver.onGraft(from, gr.topicid());

curr_graft_messages++;
}

for (const auto &pr : c.prune()) {
if (curr_prune_messages == limits_->max_prune_messages) {
break;
}
size_t curr_prune_peer_infos = 0;
if (!pr.has_topicid()) {
continue;
}
Expand All @@ -95,13 +140,20 @@ namespace libp2p::protocol::gossip {
log()->debug(
"prune backoff={}, {} peers", backoff_time, pr.peers_size());
for (const auto &peer : pr.peers()) {
if (curr_prune_peer_infos == limits_->max_prune_peer_infos) {
break;
}
// TODO(artem): meshsub 1.1.0 + signed peer records NYI

log()->debug("peer id size={}, signed peer record size={}",
peer.peerid().size(),
peer.signedpeerrecord().size());

curr_prune_peer_infos++;
}
receiver.onPrune(from, pr.topicid(), backoff_time);

curr_prune_messages++;
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/protocol/gossip/impl/message_parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "common.hpp"

#include <libp2p/protocol/gossip/gossip.hpp>

namespace pubsub::pb {
// protobuf message forward declaration
class RPC;
Expand All @@ -20,7 +22,7 @@ namespace libp2p::protocol::gossip {
/// Protobuf message parser.
class MessageParser {
public:
MessageParser();
MessageParser(std::shared_ptr<RPCLimits> limits);

~MessageParser();

Expand All @@ -33,6 +35,8 @@ namespace libp2p::protocol::gossip {
private:
/// Parsed protobuf message
std::unique_ptr<pubsub::pb::RPC> pb_msg_;
/// Initialised RPC Limits
std::shared_ptr<RPCLimits> limits_;
};

} // namespace libp2p::protocol::gossip
3 changes: 2 additions & 1 deletion src/protocol/gossip/impl/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace libp2p::protocol::gossip {
std::shared_ptr<connection::Stream> stream,
PeerContextPtr peer)
: stream_id_(stream_id),
config_{config},
timeout_(config.rw_timeout_msec),
scheduler_(scheduler),
max_message_size_(config.max_message_size),
Expand Down Expand Up @@ -109,7 +110,7 @@ namespace libp2p::protocol::gossip {
peer_->str,
stream_id_);

MessageParser parser;
MessageParser parser{config_.rpc_limits};
if (!parser.parse(*read_buffer_)) {
feedback_(peer_, Error::MESSAGE_PARSE_ERROR);
return;
Expand Down
1 change: 1 addition & 0 deletions src/protocol/gossip/impl/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace libp2p::protocol::gossip {
void asyncPostError(Error error);

[[maybe_unused]] const size_t stream_id_;
const Config &config_;
const Time timeout_;
basic::Scheduler &scheduler_;
const size_t max_message_size_;
Expand Down
8 changes: 8 additions & 0 deletions test/libp2p/protocol/gossip/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ target_link_libraries(gossip_local_subs_test
p2p_gossip
p2p_testutil_peer
)

addtest(gossip_rpc_limits_test
gossip_rpc_limits_test.cpp
)
target_link_libraries(gossip_rpc_limits_test
p2p_gossip
p2p_testutil_peer
)
Loading
Loading