Skip to content

Commit d89840b

Browse files
authored
Merge pull request #144 from accelerated/header_fix
Header fixes and header copy considerations
2 parents a9a0693 + 25c2eaa commit d89840b

File tree

8 files changed

+179
-43
lines changed

8 files changed

+179
-43
lines changed

include/cppkafka/buffer.h

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ class CPPKAFKA_API Buffer {
8181
throw Exception("Invalid buffer configuration");
8282
}
8383
}
84+
85+
/**
86+
* Constructs a buffer from two iterators in the range [first,last)
87+
*
88+
* \param first An iterator to the start of data
89+
* \param last An iterator to the end of data (not included)
90+
*/
91+
template <typename Iter>
92+
Buffer(const Iter first, const Iter last)
93+
: Buffer(&*first, std::distance(first, last)) {
94+
}
8495

8596
/**
8697
* Constructs a buffer from a vector
@@ -93,7 +104,9 @@ class CPPKAFKA_API Buffer {
93104
static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
94105
}
95106

96-
// Don't allow construction from temporary vectors
107+
/**
108+
* Don't allow construction from temporary vectors
109+
*/
97110
template <typename T>
98111
Buffer(std::vector<T>&& data) = delete;
99112

@@ -108,7 +121,9 @@ class CPPKAFKA_API Buffer {
108121
static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
109122
}
110123

111-
// Don't allow construction from temporary arrays
124+
/**
125+
* Don't allow construction from temporary arrays
126+
*/
112127
template <typename T, size_t N>
113128
Buffer(std::array<T, N>&& data) = delete;
114129

@@ -120,9 +135,11 @@ class CPPKAFKA_API Buffer {
120135
*/
121136
Buffer(const std::string& data);
122137

123-
// Don't allow construction from temporary strings
138+
/**
139+
* Don't allow construction from temporary strings
140+
*/
124141
Buffer(std::string&&) = delete;
125-
142+
126143
Buffer(const Buffer&) = delete;
127144
Buffer(Buffer&&) = default;
128145
Buffer& operator=(const Buffer&) = delete;

include/cppkafka/clonable_ptr.h

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ class ClonablePtr {
6060
* \param rhs The pointer to be copied
6161
*/
6262
ClonablePtr(const ClonablePtr& rhs)
63-
: handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
64-
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter())),
65-
cloner_(rhs.cloner_) {
63+
: handle_(std::unique_ptr<T, Deleter>(rhs.try_clone(), rhs.get_deleter())),
64+
cloner_(rhs.get_cloner()) {
6665

6766
}
6867

@@ -72,11 +71,10 @@ class ClonablePtr {
7271
* \param rhs The pointer to be copied
7372
*/
7473
ClonablePtr& operator=(const ClonablePtr& rhs) {
75-
if (this == &rhs) {
76-
return *this;
74+
if (this != &rhs) {
75+
handle_ = std::unique_ptr<T, Deleter>(rhs.try_clone(), rhs.get_deleter());
76+
cloner_ = rhs.get_cloner();
7777
}
78-
handle_ = rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
79-
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter());
8078
return *this;
8179
}
8280

@@ -98,13 +96,38 @@ class ClonablePtr {
9896
return handle_.release();
9997
}
10098

99+
/**
100+
* \brief Reset the internal pointer to a new one
101+
*/
102+
void reset(T* ptr) {
103+
handle_.reset(ptr);
104+
}
105+
106+
/**
107+
* \brief Get the deleter
108+
*/
109+
const Deleter& get_deleter() const {
110+
return handle_.get_deleter();
111+
}
112+
113+
/**
114+
* \brief Get the cloner
115+
*/
116+
const Cloner& get_cloner() const {
117+
return cloner_;
118+
}
119+
101120
/**
102121
* \brief Indicates whether this ClonablePtr instance is valid (not null)
103122
*/
104123
explicit operator bool() const {
105124
return static_cast<bool>(handle_);
106125
}
107126
private:
127+
T* try_clone() const {
128+
return cloner_ ? cloner_(get()) : get();
129+
}
130+
108131
std::unique_ptr<T, Deleter> handle_;
109132
Cloner cloner_;
110133
};

include/cppkafka/header_list.h

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ namespace cppkafka {
5151
template <typename HeaderType>
5252
class HeaderList {
5353
public:
54+
template <typename OtherHeaderType>
55+
friend class HeaderList;
56+
5457
using BufferType = typename HeaderType::ValueType;
5558
using Iterator = HeaderIterator<HeaderType>;
5659
/**
@@ -75,6 +78,16 @@ class HeaderList {
7578
*/
7679
explicit HeaderList(rd_kafka_headers_t* handle);
7780

81+
/**
82+
* \brief Create a header list from another header list type
83+
* \param other The other list
84+
*/
85+
template <typename OtherHeaderType>
86+
HeaderList(const HeaderList<OtherHeaderType>& other);
87+
88+
template <typename OtherHeaderType>
89+
HeaderList(HeaderList<OtherHeaderType>&& other);
90+
7891
/**
7992
* \brief Add a header to the list. This translates to rd_kafka_header_add().
8093
* \param header The header.
@@ -162,7 +175,6 @@ class HeaderList {
162175
private:
163176
struct NonOwningTag { };
164177
static void dummy_deleter(rd_kafka_headers_t*) {}
165-
static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast<rd_kafka_headers_t*>(handle); }
166178

167179
using HandlePtr = ClonablePtr<rd_kafka_headers_t, decltype(&rd_kafka_headers_destroy),
168180
decltype(&rd_kafka_headers_copy)>;
@@ -205,18 +217,32 @@ HeaderList<HeaderType>::HeaderList()
205217
template <typename HeaderType>
206218
HeaderList<HeaderType>::HeaderList(size_t reserve)
207219
: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) {
208-
220+
assert(reserve);
209221
}
210222

211223
template <typename HeaderType>
212224
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle)
213225
: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy
214-
226+
assert(handle);
215227
}
216228

217229
template <typename HeaderType>
218230
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle, NonOwningTag)
219-
: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy.
231+
: handle_(handle, &dummy_deleter, nullptr) { //if we don't own the header list, we forward the handle on copy.
232+
assert(handle);
233+
}
234+
235+
template <typename HeaderType>
236+
template <typename OtherHeaderType>
237+
HeaderList<HeaderType>::HeaderList(const HeaderList<OtherHeaderType>& other)
238+
: handle_(other.handle_) {
239+
240+
}
241+
242+
template <typename HeaderType>
243+
template <typename OtherHeaderType>
244+
HeaderList<HeaderType>::HeaderList(HeaderList<OtherHeaderType>&& other)
245+
: handle_(std::move(other.handle_)) {
220246

221247
}
222248

@@ -254,7 +280,7 @@ HeaderType HeaderList<HeaderType>::at(size_t index) const {
254280
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
255281
throw Exception(error.to_string());
256282
}
257-
return HeaderType(name, BufferType(value, size));
283+
return HeaderType(name, BufferType(value, value + size));
258284
}
259285

260286
template <typename HeaderType>
@@ -269,8 +295,7 @@ HeaderType HeaderList<HeaderType>::back() const {
269295

270296
template <typename HeaderType>
271297
size_t HeaderList<HeaderType>::size() const {
272-
assert(handle_);
273-
return rd_kafka_header_cnt(handle_.get());
298+
return handle_ ? rd_kafka_header_cnt(handle_.get()) : 0;
274299
}
275300

276301
template <typename HeaderType>
@@ -281,18 +306,13 @@ bool HeaderList<HeaderType>::empty() const {
281306
template <typename HeaderType>
282307
typename HeaderList<HeaderType>::Iterator
283308
HeaderList<HeaderType>::begin() const {
284-
assert(handle_);
285-
if (empty()) {
286-
return end();
287-
}
288-
return Iterator(make_non_owning(handle_.get()), 0);
309+
return Iterator(*this, 0);
289310
}
290311

291312
template <typename HeaderType>
292313
typename HeaderList<HeaderType>::Iterator
293314
HeaderList<HeaderType>::end() const {
294-
assert(handle_);
295-
return Iterator(make_non_owning(handle_.get()), size());
315+
return Iterator(*this, size());
296316
}
297317

298318
template <typename HeaderType>

include/cppkafka/header_list_iterator.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,9 @@ class HeaderIterator {
151151
}
152152

153153
private:
154-
HeaderIterator(HeaderListType headers,
154+
HeaderIterator(const HeaderListType& headers,
155155
size_t index)
156-
: header_list_(std::move(headers)),
157-
header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)),
156+
: header_list_(headers),
158157
index_(index) {
159158
}
160159

@@ -169,7 +168,7 @@ class HeaderIterator {
169168
other.get_value().get_size()));
170169
}
171170

172-
HeaderListType header_list_;
171+
const HeaderListType& header_list_;
173172
HeaderType header_;
174173
size_t index_;
175174
};

include/cppkafka/message.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ class CPPKAFKA_API Message {
127127
}
128128

129129
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
130+
/**
131+
* \brief Sets the message's header list.
132+
* \note This calls rd_kafka_message_set_headers.
133+
*/
134+
void set_header_list(const HeaderListType& headers) {
135+
assert(handle_);
136+
if (!headers) {
137+
return; //nothing to set
138+
}
139+
rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle());
140+
rd_kafka_message_set_headers(handle_.get(), handle_copy);
141+
header_list_ = HeaderListType::make_non_owning(handle_copy);
142+
}
143+
130144
/**
131145
* \brief Gets the message's header list
132146
*/

0 commit comments

Comments
 (0)