@@ -58,177 +58,6 @@ const (
5858 EventReset
5959)
6060
61- var (
62- // luaAppend is the Lua script used to append an item to an array key and
63- // return its new value.
64- luaAppend = redis .NewScript (`
65- local v = redis.call("HGET", KEYS[1], ARGV[1])
66-
67- -- If the value exists, append the new value, otherwise assign ARGV[2] directly
68- v = (v and v .. "," .. ARGV[2]) or ARGV[2]
69-
70- -- Set the updated value in the hash and publish the change
71- redis.call("HSET", KEYS[1], ARGV[1], v)
72- redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
73-
74- return v
75- ` )
76-
77- // luaAppendUnique is the Lua script used to append an item to a set and return
78- // the result.
79- luaAppendUnique = redis .NewScript (`
80- local v = redis.call("HGET", KEYS[1], ARGV[1])
81- local newValues = {}
82- local changed = false
83-
84- -- Split ARGV[2] into a table of new values
85- for value in string.gmatch(ARGV[2], "[^,]+") do
86- table.insert(newValues, value)
87- end
88-
89- -- If the value exists, process it, else set it directly
90- if v then
91- local existingValues = {}
92- -- Split existing values into a table
93- for value in string.gmatch(v, "[^,]+") do
94- existingValues[value] = true
95- end
96-
97- -- Append unique new values to v
98- for _, newValue in ipairs(newValues) do
99- if not existingValues[newValue] then
100- v = (v == "") and newValue or v .. "," .. newValue
101- changed = true
102- end
103- end
104- else
105- v = table.concat(newValues, ",")
106- changed = true
107- end
108-
109- -- If changes were made, update the hash and publish the event
110- if changed then
111- redis.call("HSET", KEYS[1], ARGV[1], v)
112- redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
113- end
114-
115- return v
116- ` )
117-
118- // luaDelete is the Lua script used to delete a key and return its previous
119- // value.
120- luaDelete = redis .NewScript (`
121- local v = redis.call("HGET", KEYS[1], ARGV[1])
122- redis.call("HDEL", KEYS[1], ARGV[1])
123- redis.call("PUBLISH", KEYS[2], ARGV[1].."=")
124- return v
125- ` )
126-
127- // luaIncr is the Lua script used to increment a key and return the new value.
128- luaIncr = redis .NewScript (`
129- redis.call("HINCRBY", KEYS[1], ARGV[1], ARGV[2])
130- local v = redis.call("HGET", KEYS[1], ARGV[1])
131- redis.call("PUBLISH", KEYS[2], ARGV[1].."="..v)
132- return v
133- ` )
134-
135- // luaRemove is the Lua script used to remove items from an array value and
136- // return the result along with a flag indicating if any value was removed.
137- luaRemove = redis .NewScript (`
138- local v = redis.call("HGET", KEYS[1], ARGV[1])
139- local removed = false
140-
141- if v then
142- -- Create a set of current values
143- local curr = {}
144- for s in string.gmatch(v, "[^,]+") do
145- curr[s] = true
146- end
147-
148- -- Remove specified values
149- for s in string.gmatch(ARGV[2], "[^,]+") do
150- if curr[s] then
151- curr[s] = nil
152- removed = true
153- end
154- end
155-
156- -- Collect the remaining values
157- local newValues = {}
158- for key, _ in pairs(curr) do
159- table.insert(newValues, key)
160- end
161-
162- -- Update the hash or delete the key if empty
163- if #newValues == 0 then
164- redis.call("HDEL", KEYS[1], ARGV[1])
165- v = ""
166- else
167- v = table.concat(newValues, ",")
168- redis.call("HSET", KEYS[1], ARGV[1], v)
169- end
170-
171- -- Publish the result
172- redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
173- end
174-
175- return {v, removed}
176- ` )
177-
178- // luaReset is the Lua script used to reset the map.
179- luaReset = redis .NewScript (`
180- redis.call("DEL", KEYS[1])
181- redis.call("PUBLISH", KEYS[2], "*=")
182- ` )
183-
184- // luaSet is the Lua script used to set a key and return its previous value. We
185- // use Lua scripts to publish notifications "at the same time" and preserve the
186- // order of operations (scripts are run atomically within Redis).
187- luaSet = redis .NewScript (`
188- local v = redis.call("HGET", KEYS[1], ARGV[1])
189- redis.call("HSET", KEYS[1], ARGV[1], ARGV[2])
190- redis.call("PUBLISH", KEYS[2], ARGV[1].."="..ARGV[2])
191- return v
192- ` )
193-
194- // luaTestAndDel is the Lua script used to delete a key if it has a specific value.
195- luaTestAndDel = redis .NewScript (`
196- local v = redis.call("HGET", KEYS[1], ARGV[1])
197- if v == ARGV[2] then
198- redis.call("HDEL", KEYS[1], ARGV[1])
199- redis.call("PUBLISH", KEYS[2], ARGV[1].."=")
200- end
201- return v
202- ` )
203-
204- // luaTestAndReset is the Lua script used to reset the map if all the given keys
205- // have the given values.
206- luaTestAndReset = redis .NewScript (`
207- local hash = KEYS[1]
208- local n = (#ARGV - 1) / 2
209-
210- for i = 2, n + 1 do
211- if redis.call("HGET", hash, ARGV[i]) ~= ARGV[i + n] then
212- return 0
213- end
214- end
215-
216- redis.call("DEL", hash)
217- redis.call("PUBLISH", KEYS[2], "*=")
218- return 1
219- ` )
220-
221- // luaTestAndSet is the Lua script used to set a key if it has a specific value.
222- luaTestAndSet = redis .NewScript (`
223- local v = redis.call("HGET", KEYS[1], ARGV[1])
224- if v == ARGV[2] then
225- redis.call("HSET", KEYS[1], ARGV[1], ARGV[3])
226- redis.call("PUBLISH", KEYS[2], ARGV[1].."="..ARGV[3])
227- end
228- return v
229- ` )
230- )
231-
23261// Join retrieves the content of the replicated map with the given name and
23362// subscribes to updates. The local content is eventually consistent across all
23463// nodes that join the replicated map with the same name.
@@ -274,7 +103,7 @@ func Join(ctx context.Context, name string, rdb *redis.Client, opts ...MapOption
274103
275104 // read updates
276105 sm .wait .Add (1 )
277- go sm .run ( )
106+ pulse . Go ( ctx , sm .run )
278107
279108 sm .logger .Info ("joined" )
280109 return sm , nil
0 commit comments