Skip to content

Commit e88b487

Browse files
authored
Merge pull request #766 from estolfo/RUBY-1104-gc-cursor
RUBY-1104 Add CursorReaper for periodically sending kill cursor operations
2 parents cf89c6a + 467305f commit e88b487

File tree

8 files changed

+605
-7
lines changed

8 files changed

+605
-7
lines changed

lib/mongo/cluster.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
require 'mongo/cluster/topology'
16+
require 'mongo/cluster/cursor_reaper'
1617

1718
module Mongo
1819

@@ -42,6 +43,7 @@ class Cluster
4243
attr_reader :topology
4344

4445
def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown?
46+
def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor
4547

4648
# Determine if this cluster of servers is equal to another object. Checks the
4749
# servers currently in the cluster, not what was configured.
@@ -113,6 +115,10 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
113115
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))
114116

115117
seeds.each{ |seed| add(seed) }
118+
119+
@cursor_reaper = CursorReaper.new
120+
@cursor_reaper.run!
121+
116122
ObjectSpace.define_finalizer(self, self.class.finalize(pools))
117123
end
118124

@@ -130,6 +136,8 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
130136
# @since 2.2.0
131137
def self.finalize(pools)
132138
proc do
139+
begin; @cursor_reaper.kill_cursors; rescue; end
140+
@cursor_reaper.stop!
133141
pools.values.each do |pool|
134142
pool.disconnect!
135143
end
@@ -288,6 +296,8 @@ def servers
288296
#
289297
# @since 2.1.0
290298
def disconnect!
299+
begin; @cursor_reaper.kill_cursors; rescue; end
300+
@cursor_reaper.stop!
291301
@servers.each { |server| server.disconnect! } and true
292302
end
293303

@@ -301,7 +311,8 @@ def disconnect!
301311
# @since 2.1.0
302312
def reconnect!
303313
scan!
304-
servers.each { |server| server.reconnect! } and true
314+
servers.each { |server| server.reconnect! }
315+
@cursor_reaper.restart! and true
305316
end
306317

307318
# Add hosts in a description to the cluster.

lib/mongo/cluster/cursor_reaper.rb

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
# Copyright (C) 2014-2015 MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require 'set'
16+
17+
module Mongo
18+
class Cluster
19+
20+
# A manager that sends kill cursors operations at regular intervals to close
21+
# cursors that have been garbage collected without being exhausted.
22+
#
23+
# @since 2.3.0
24+
class CursorReaper
25+
extend Forwardable
26+
include Retryable
27+
28+
# The default time interval for the cursor reaper to send pending kill cursors operations.
29+
#
30+
# @since 2.3.0
31+
FREQUENCY = 1.freeze
32+
33+
# Create a cursor reaper.
34+
#
35+
# @example Create a CursorReaper.
36+
# Mongo::Cluster::CursorReaper.new(cluster)
37+
#
38+
# @api private
39+
#
40+
# @since 2.3.0
41+
def initialize
42+
@to_kill = {}
43+
@active_cursors = Set.new
44+
@mutex = Mutex.new
45+
end
46+
47+
# Start the cursor reaper's thread.
48+
#
49+
# @example Start the cursor reaper's thread.
50+
# reaper.run!
51+
#
52+
# @api private
53+
#
54+
# @since 2.3.0
55+
def run!
56+
@thread && @thread.alive? ? @thread : start!
57+
end
58+
alias :restart! :run!
59+
60+
# Schedule a kill cursors operation to be eventually executed.
61+
#
62+
# @example Schedule a kill cursors operation.
63+
# cursor_reaper.schedule_kill_cursor(id, op_spec, server)
64+
#
65+
# @param [ Integer ] id The id of the cursor to kill.
66+
# @param [ Hash ] op_spec The spec for the kill cursors op.
67+
# @param [ Mongo::Server ] server The server to send the kill cursors operation to.
68+
#
69+
# @api private
70+
#
71+
# @since 2.3.0
72+
def schedule_kill_cursor(id, op_spec, server)
73+
@mutex.synchronize do
74+
if @active_cursors.include?(id)
75+
@to_kill[server] ||= Set.new
76+
@to_kill[server] << op_spec
77+
end
78+
end
79+
end
80+
81+
# Register a cursor id as active.
82+
#
83+
# @example Register a cursor as active.
84+
# cursor_reaper.register_cursor(id)
85+
#
86+
# @param [ Integer ] id The id of the cursor to register as active.
87+
#
88+
# @api private
89+
#
90+
# @since 2.3.0
91+
def register_cursor(id)
92+
if id && id > 0
93+
@mutex.synchronize do
94+
@active_cursors << id
95+
end
96+
end
97+
end
98+
99+
# Unregister a cursor id, indicating that it's no longer active.
100+
#
101+
# @example Unregister a cursor.
102+
# cursor_reaper.unregister_cursor(id)
103+
#
104+
# @param [ Integer ] id The id of the cursor to unregister.
105+
#
106+
# @api private
107+
#
108+
# @since 2.3.0
109+
def unregister_cursor(id)
110+
@mutex.synchronize do
111+
@active_cursors.delete(id)
112+
end
113+
end
114+
115+
# Stop the cursor reaper's thread.
116+
#
117+
# @example Stop the cursor reaper's thread.
118+
# reaper.stop!
119+
#
120+
# @api private
121+
#
122+
# @since 2.3.0
123+
def stop!
124+
@thread.kill && @thread.stop?
125+
end
126+
127+
# Execute all pending kill cursors operations.
128+
#
129+
# @example Execute pending kill cursors operations.
130+
# cursor_reaper.kill_cursors
131+
#
132+
# @api private
133+
#
134+
# @since 2.3.0
135+
def kill_cursors
136+
to_kill_copy = {}
137+
active_cursors_copy = []
138+
139+
@mutex.synchronize do
140+
to_kill_copy = @to_kill.dup
141+
active_cursors_copy = @active_cursors.dup
142+
@to_kill = {}
143+
end
144+
145+
to_kill_copy.each do |server, op_specs|
146+
op_specs.each do |op_spec|
147+
if server.features.find_command_enabled?
148+
Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a)
149+
if Cursor::Builder::KillCursorsCommand.get_cursors_list(op_spec).size > 0
150+
Operation::Commands::Command.new(op_spec).execute(server)
151+
end
152+
else
153+
Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a)
154+
if Cursor::Builder::OpKillCursors.get_cursors_list(op_spec).size > 0
155+
Operation::KillCursors.new(op_spec).execute(server)
156+
end
157+
end
158+
end
159+
end
160+
end
161+
162+
private
163+
164+
def start!
165+
@thread = Thread.new(FREQUENCY) do |i|
166+
loop do
167+
sleep(i)
168+
kill_cursors
169+
end
170+
end
171+
end
172+
end
173+
end
174+
end

lib/mongo/cursor.rb

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,31 @@ def initialize(view, result, server)
5757
@server = server
5858
@initial_result = result
5959
@remaining = limit if limited?
60+
@cursor_id = result.cursor_id
61+
register
62+
ObjectSpace.define_finalizer(self, self.class.finalize(result.cursor_id,
63+
cluster,
64+
kill_cursors_op_spec,
65+
server))
66+
end
67+
68+
69+
# Finalize the cursor for garbage collection. Schedules this cursor to be included
70+
# in a killCursors operation executed by the Cluster's CursorReaper.
71+
#
72+
# @example Finalize the cursor.
73+
# Cursor.finalize(id, cluster, op, server)
74+
#
75+
# @param [ Integer ] cursor_id The cursor's id.
76+
# @param [ Mongo::Cluster ] cluster The cluster associated with this cursor and its server.
77+
# @param [ Hash ] op_spec The killCursors operation specification.
78+
# @param [ Mongo::Server ] server The server to send the killCursors operation to.
79+
#
80+
# @return [ Proc ] The Finalizer.
81+
#
82+
# @since 2.3.0
83+
def self.finalize(cursor_id, cluster, op_spec, server)
84+
proc { cluster.schedule_kill_cursor(cursor_id, op_spec, server) }
6085
end
6186

6287
# Get a human-readable string representation of +Cursor+.
@@ -173,16 +198,25 @@ def get_more_operation
173198
end
174199

175200
def kill_cursors
176-
read_with_retry do
201+
unregister
202+
read_with_one_retry do
177203
kill_cursors_operation.execute(@server)
178204
end
179205
end
180206

181207
def kill_cursors_operation
182208
if @server.features.find_command_enabled?
183-
Operation::Commands::Command.new(Builder::KillCursorsCommand.new(self).specification)
209+
Operation::Commands::Command.new(kill_cursors_op_spec)
210+
else
211+
Operation::KillCursors.new(kill_cursors_op_spec)
212+
end
213+
end
214+
215+
def kill_cursors_op_spec
216+
if @server.features.find_command_enabled?
217+
Builder::KillCursorsCommand.new(self).specification
184218
else
185-
Operation::KillCursors.new(Builder::OpKillCursors.new(self).specification)
219+
Builder::OpKillCursors.new(self).specification
186220
end
187221
end
188222

@@ -196,13 +230,22 @@ def more?
196230

197231
def process(result)
198232
@remaining -= result.returned_count if limited?
199-
@cursor_id = result.cursor_id
200233
@coll_name ||= result.namespace.sub("#{database.name}.", '') if result.namespace
234+
unregister if result.cursor_id == 0
235+
@cursor_id = result.cursor_id
201236
result.documents
202237
end
203238

204239
def use_limit?
205240
limited? && batch_size >= @remaining
206241
end
242+
243+
def register
244+
cluster.register_cursor(@cursor_id)
245+
end
246+
247+
def unregister
248+
cluster.unregister_cursor(@cursor_id)
249+
end
207250
end
208251
end

lib/mongo/cursor/builder/kill_cursors_command.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,34 @@ def specification
5656
def kill_cursors_command
5757
{ :killCursors => collection_name, :cursors => [ cursor.id ] }
5858
end
59+
60+
class << self
61+
62+
# Update a specification's list of cursor ids.
63+
#
64+
# @example Update a specification's list of cursor ids.
65+
# KillCursorsCommand.update_cursors(spec, ids)
66+
#
67+
# @return [ Hash ] The specification.
68+
# @return [ Array ] The ids to update with.
69+
#
70+
# @since 2.3.0
71+
def update_cursors(spec, ids)
72+
spec[:selector].merge!(cursors: spec[:selector][:cursors] & ids)
73+
end
74+
75+
# Get the list of cursor ids from a spec generated by this Builder.
76+
#
77+
# @example Get the list of cursor ids.
78+
# KillCursorsCommand.cursors(spec)
79+
#
80+
# @return [ Hash ] The specification.
81+
#
82+
# @since 2.3.0
83+
def get_cursors_list(spec)
84+
spec[:selector][:cursors]
85+
end
86+
end
5987
end
6088
end
6189
end

lib/mongo/cursor/builder/op_kill_cursors.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,34 @@ def initialize(cursor)
5050
def specification
5151
{ :coll_name => collection_name, :db_name => database.name, :cursor_ids => [ cursor.id ] }
5252
end
53+
54+
class << self
55+
56+
# Update a specification's list of cursor ids.
57+
#
58+
# @example Update a specification's list of cursor ids.
59+
# OpKillCursors.update_cursors(spec, ids)
60+
#
61+
# @return [ Hash ] The specification.
62+
# @return [ Array ] The ids to update with.
63+
#
64+
# @since 2.3.0
65+
def update_cursors(spec, ids)
66+
spec.merge!(cursor_ids: spec[:cursor_ids] & ids)
67+
end
68+
69+
# Get the list of cursor ids from a spec generated by this Builder.
70+
#
71+
# @example Get the list of cursor ids.
72+
# OpKillCursors.cursors(spec)
73+
#
74+
# @return [ Hash ] The specification.
75+
#
76+
# @since 2.3.0
77+
def get_cursors_list(spec)
78+
spec[:cursor_ids]
79+
end
80+
end
5381
end
5482
end
5583
end

0 commit comments

Comments
 (0)