Skip to content

Commit 6fd69e6

Browse files
authored
Support caching for map primitive (#10)
* Add support for LRU cached maps * Add LRU cached map support * Add map options to client API * Simplify cached map using session guarantees * Cache entire map to ensure consistent cache size * Update cached map implementation to match spec
1 parent 927b103 commit 6fd69e6

File tree

9 files changed

+589
-12
lines changed

9 files changed

+589
-12
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/gogo/protobuf v1.3.1
1010
github.com/golang/protobuf v1.3.2
1111
github.com/google/uuid v1.1.1
12+
github.com/hashicorp/golang-lru v0.5.4
1213
github.com/stretchr/testify v1.4.0
1314
google.golang.org/grpc v1.27.0
1415
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xC
174174
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
175175
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
176176
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
177+
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
178+
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
177179
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
178180
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
179181
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=

pkg/client/client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,8 @@ func (d *Database) GetLog(ctx context.Context, name string) (log.Log, error) {
380380
}
381381

382382
// GetMap gets or creates a Map with the given name
383-
func (d *Database) GetMap(ctx context.Context, name string) (_map.Map, error) {
384-
return _map.New(ctx, primitive.NewName(d.Namespace, d.Name, d.scope, name), d.sessions)
383+
func (d *Database) GetMap(ctx context.Context, name string, opts ..._map.Option) (_map.Map, error) {
384+
return _map.New(ctx, primitive.NewName(d.Namespace, d.Name, d.scope, name), d.sessions, opts...)
385385
}
386386

387387
// GetSet gets or creates a Set with the given name
@@ -442,8 +442,8 @@ func (g *PartitionGroup) GetLog(ctx context.Context, name string) (log.Log, erro
442442
}
443443

444444
// GetMap gets or creates a Map with the given name
445-
func (g *PartitionGroup) GetMap(ctx context.Context, name string) (_map.Map, error) {
446-
return _map.New(ctx, primitive.NewName(g.Namespace, g.Name, g.scope, name), g.sessions)
445+
func (g *PartitionGroup) GetMap(ctx context.Context, name string, opts ..._map.Option) (_map.Map, error) {
446+
return _map.New(ctx, primitive.NewName(g.Namespace, g.Name, g.scope, name), g.sessions, opts...)
447447
}
448448

449449
// GetSet gets or creates a Set with the given name

pkg/client/map/cache.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Copyright 2019-present Open Networking Foundation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package _map //nolint:golint
16+
17+
import (
18+
"context"
19+
"github.com/hashicorp/golang-lru"
20+
"sync"
21+
)
22+
23+
// newCachingMap returns a decorated map that caches updates to the given map
24+
func newCachingMap(_map Map, size int) (Map, error) {
25+
cache, err := lru.New(size)
26+
if err != nil {
27+
return nil, err
28+
}
29+
cachingMap := &cachingMap{
30+
delegatingMap: newDelegatingMap(_map),
31+
pending: make(map[string]*cachedEntry),
32+
cache: cache,
33+
}
34+
if err := cachingMap.open(); err != nil {
35+
return nil, err
36+
}
37+
return cachingMap, nil
38+
}
39+
40+
// cachingMap is an implementation of the Map interface that caches entries
41+
type cachingMap struct {
42+
*delegatingMap
43+
cancel context.CancelFunc
44+
pending map[string]*cachedEntry
45+
cache *lru.Cache
46+
cacheVersion int64
47+
mu sync.RWMutex
48+
}
49+
50+
// open opens the map listeners
51+
func (m *cachingMap) open() error {
52+
ch := make(chan *Event)
53+
ctx, cancel := context.WithCancel(context.Background())
54+
m.mu.Lock()
55+
m.cancel = cancel
56+
m.mu.Unlock()
57+
if err := m.delegatingMap.Watch(ctx, ch, WithReplay()); err != nil {
58+
return err
59+
}
60+
go func() {
61+
for event := range ch {
62+
m.cacheUpdate(event.Entry, event.Type == EventRemoved)
63+
}
64+
}()
65+
return nil
66+
}
67+
68+
// cacheUpdate caches the given updated entry
69+
func (m *cachingMap) cacheUpdate(update *Entry, tombstone bool) {
70+
m.mu.Lock()
71+
defer m.mu.Unlock()
72+
73+
// If the update version is less than the cache version, the cache contains
74+
// more recent updates. Ignore the update.
75+
if update.Version <= m.cacheVersion {
76+
return
77+
}
78+
79+
// If the pending entry is newer than the update entry, the update can be ignored.
80+
// Otherwise, remove the entry from the pending cache if present.
81+
if pending, ok := m.pending[update.Key]; ok {
82+
if pending.Version > update.Version {
83+
return
84+
}
85+
delete(m.pending, update.Key)
86+
}
87+
88+
// If the entry is a tombstone, remove it from the cache, otherwise insert it.
89+
if tombstone {
90+
m.cache.Remove(update.Key)
91+
} else {
92+
m.cache.Add(update.Key, update)
93+
}
94+
95+
// Update the cache version.
96+
m.cacheVersion = update.Version
97+
}
98+
99+
// cacheRead caches the given read entry
100+
func (m *cachingMap) cacheRead(read *Entry, tombstone bool) {
101+
m.mu.Lock()
102+
defer m.mu.Unlock()
103+
104+
// If the entry version is less than the cache version, ignore the update. The entry will
105+
// have been cached as an update.
106+
if read.Version <= m.cacheVersion {
107+
return
108+
}
109+
110+
// The pending cache contains the most recent known state for the entry.
111+
// If the read entry is newer than the pending entry for the key, update
112+
// the pending cache.
113+
if pending, ok := m.pending[read.Key]; !ok || read.Version > pending.Version {
114+
m.pending[read.Key] = &cachedEntry{
115+
Entry: read,
116+
tombstone: tombstone,
117+
}
118+
}
119+
}
120+
121+
// getCache gets a cached entry
122+
func (m *cachingMap) getCache(key string) (*Entry, bool) {
123+
m.mu.RLock()
124+
defer m.mu.RUnlock()
125+
126+
// The pending cache contains the most recent known states. If the entry is present
127+
// in the pending cache, return it rather than using the LRU cache.
128+
if entry, ok := m.pending[key]; ok {
129+
if entry.tombstone {
130+
return nil, true
131+
}
132+
return entry.Entry, true
133+
}
134+
135+
// If the entry is present in the LRU cache, return it.
136+
if entry, ok := m.cache.Get(key); ok {
137+
return entry.(*Entry), true
138+
}
139+
return nil, false
140+
}
141+
142+
func (m *cachingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) {
143+
// If the entry is already in the cache, return it
144+
if entry, ok := m.getCache(key); ok {
145+
return entry, nil
146+
}
147+
148+
// Otherwise, fetch the entry from the underlying map
149+
entry, err := m.delegatingMap.Get(ctx, key, opts...)
150+
if err != nil {
151+
return nil, err
152+
}
153+
154+
// Update the cache if necessary
155+
if err != nil {
156+
return nil, err
157+
}
158+
m.cacheRead(entry, entry.Value == nil)
159+
return entry, nil
160+
}
161+
162+
func (m *cachingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) {
163+
// Put the entry in the map using the underlying map delegate
164+
entry, err := m.delegatingMap.Put(ctx, key, value, opts...)
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
// Update the cache if necessary
170+
if err != nil {
171+
return nil, err
172+
}
173+
m.cacheRead(entry, false)
174+
return entry, nil
175+
}
176+
177+
func (m *cachingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) {
178+
// Remove the entry from the map using the underlying map delegate
179+
entry, err := m.delegatingMap.Remove(ctx, key, opts...)
180+
if err != nil {
181+
return nil, err
182+
}
183+
184+
// Update the cache if necessary
185+
if err != nil {
186+
return nil, err
187+
}
188+
m.cacheRead(entry, true)
189+
return entry, nil
190+
}
191+
192+
func (m *cachingMap) Close(ctx context.Context) error {
193+
m.mu.Lock()
194+
if m.cancel != nil {
195+
m.cancel()
196+
}
197+
m.mu.Unlock()
198+
return m.delegatingMap.Close(ctx)
199+
}
200+
201+
// cachedEntry is a cached entry
202+
type cachedEntry struct {
203+
*Entry
204+
tombstone bool
205+
}

0 commit comments

Comments
 (0)