Skip to content

Commit 56f36ec

Browse files
sosyzLinkinStars
authored andcommitted
refactor(plugin): improve KV storage with better caching and param handling
1 parent 42d6ad6 commit 56f36ec

File tree

1 file changed

+112
-86
lines changed

1 file changed

+112
-86
lines changed

plugin/kv_storage.go

Lines changed: 112 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,64 @@ package plugin
2020

2121
import (
2222
"context"
23-
"encoding/json"
2423
"fmt"
25-
"math/rand"
24+
"math/rand/v2"
2625
"time"
2726

2827
"github.com/apache/answer/internal/entity"
28+
"github.com/segmentfault/pacman/cache"
2929
"github.com/segmentfault/pacman/log"
3030
"xorm.io/builder"
3131
"xorm.io/xorm"
3232
)
3333

34-
// define error
34+
// Error variables for KV storage operations
3535
var (
36-
ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage")
37-
ErrKVGroupEmpty = fmt.Errorf("group name is empty")
38-
ErrKVKeyEmpty = fmt.Errorf("key name is empty")
39-
ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty")
40-
ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed")
41-
ErrKVDataNotInitialized = fmt.Errorf("KV storage data not initialized")
42-
ErrKVDBNotInitialized = fmt.Errorf("KV storage database connection not initialized")
36+
// ErrKVKeyNotFound is returned when the requested key does not exist in the KV storage
37+
ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage")
38+
// ErrKVGroupEmpty is returned when a required group name is empty
39+
ErrKVGroupEmpty = fmt.Errorf("group name is empty")
40+
// ErrKVKeyEmpty is returned when a required key name is empty
41+
ErrKVKeyEmpty = fmt.Errorf("key name is empty")
42+
// ErrKVKeyAndGroupEmpty is returned when both key and group names are empty
43+
ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty")
44+
// ErrKVTransactionFailed is returned when a KV storage transaction operation fails
45+
ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed")
4346
)
4447

48+
// KVParams is the parameters for KV storage operations
49+
type KVParams struct {
50+
Group string
51+
Key string
52+
Value string
53+
Page int
54+
PageSize int
55+
}
56+
57+
// KVOperator provides methods to interact with the key-value storage system for plugins
4558
type KVOperator struct {
4659
data *Data
4760
session *xorm.Session
4861
pluginSlugName string
62+
cacheTTL time.Duration
63+
}
64+
65+
// KVStorageOption defines a function type that configures a KVOperator
66+
type KVStorageOption func(*KVOperator)
67+
68+
// WithCacheTTL is the option to set the cache TTL; the default value is 30 minutes.
69+
// If ttl is less than 0, the cache will not be used
70+
func WithCacheTTL(ttl time.Duration) KVStorageOption {
71+
return func(kv *KVOperator) {
72+
kv.cacheTTL = ttl
73+
}
74+
}
75+
76+
// Option is used to set the options for the KV storage
77+
func (kv *KVOperator) Option(opts ...KVStorageOption) {
78+
for _, opt := range opts {
79+
opt(kv)
80+
}
4981
}
5082

5183
func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) {
@@ -62,28 +94,53 @@ func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) {
6294
return session, cleanup
6395
}
6496

65-
func (kv *KVOperator) getCacheTTL() time.Duration {
66-
return 30*time.Minute + time.Duration(rand.Intn(300))*time.Second
97+
func (kv *KVOperator) getCacheKey(params KVParams) string {
98+
return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, params.Group, params.Key)
99+
}
100+
101+
func (kv *KVOperator) setCache(ctx context.Context, params KVParams) {
102+
if kv.cacheTTL < 0 {
103+
return
104+
}
105+
106+
ttl := kv.cacheTTL
107+
if ttl > 10 {
108+
ttl += time.Duration(float64(ttl) * 0.1 * (1 - rand.Float64()))
109+
}
110+
111+
cacheKey := kv.getCacheKey(params)
112+
if err := kv.data.Cache.SetString(ctx, cacheKey, params.Value, ttl); err != nil {
113+
log.Warnf("cache set failed: %v, key: %s", err, cacheKey)
114+
}
115+
}
116+
117+
func (kv *KVOperator) getCache(ctx context.Context, params KVParams) (string, bool, error) {
118+
if kv.cacheTTL < 0 {
119+
return "", false, nil
120+
}
121+
122+
cacheKey := kv.getCacheKey(params)
123+
return kv.data.Cache.GetString(ctx, cacheKey)
67124
}
68125

69-
func (kv *KVOperator) getCacheKey(group, key string) string {
70-
if group == "" {
71-
return fmt.Sprintf("plugin_kv_storage:%s:key:%s", kv.pluginSlugName, key)
126+
func (kv *KVOperator) cleanCache(ctx context.Context, params KVParams) {
127+
if kv.cacheTTL < 0 {
128+
return
72129
}
73-
if key == "" {
74-
return fmt.Sprintf("plugin_kv_storage:%s:group:%s", kv.pluginSlugName, group)
130+
131+
if err := kv.data.Cache.Del(ctx, kv.getCacheKey(params)); err != nil {
132+
log.Warnf("Failed to delete cache for key %s: %v", params.Key, err)
75133
}
76-
return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, group, key)
77134
}
78135

79-
func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error) {
80-
// validate
81-
if key == "" {
136+
// Get retrieves a value from KV storage by group and key.
137+
// Returns the value as a string or an error if the key is not found.
138+
func (kv *KVOperator) Get(ctx context.Context, params KVParams) (string, error) {
139+
if params.Key == "" {
82140
return "", ErrKVKeyEmpty
83141
}
84142

85-
cacheKey := kv.getCacheKey(group, key)
86-
if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist {
143+
if value, exist, err := kv.getCache(ctx, params); err == nil && exist {
87144
return value, nil
88145
}
89146

@@ -94,8 +151,8 @@ func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error
94151

95152
query.Where(builder.Eq{
96153
"plugin_slug_name": kv.pluginSlugName,
97-
"`group`": group,
98-
"`key`": key,
154+
"`group`": params.Group,
155+
"`key`": params.Key,
99156
})
100157

101158
has, err := query.Get(&data)
@@ -106,15 +163,15 @@ func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error
106163
return "", ErrKVKeyNotFound
107164
}
108165

109-
if err := kv.data.Cache.SetString(ctx, cacheKey, data.Value, kv.getCacheTTL()); err != nil {
110-
log.Error(err)
111-
}
166+
kv.setCache(ctx, params)
112167

113168
return data.Value, nil
114169
}
115170

116-
func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error {
117-
if key == "" {
171+
// Set stores a value in KV storage with the specified group and key.
172+
// Updates the value if it already exists.
173+
func (kv *KVOperator) Set(ctx context.Context, params KVParams) error {
174+
if params.Key == "" {
118175
return ErrKVKeyEmpty
119176
}
120177

@@ -123,17 +180,17 @@ func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error {
123180

124181
data := &entity.PluginKVStorage{
125182
PluginSlugName: kv.pluginSlugName,
126-
Group: group,
127-
Key: key,
128-
Value: value,
183+
Group: params.Group,
184+
Key: params.Key,
185+
Value: params.Value,
129186
}
130187

131-
kv.cleanCache(ctx, group, key)
188+
kv.cleanCache(ctx, params)
132189

133190
affected, err := query.Where(builder.Eq{
134191
"plugin_slug_name": kv.pluginSlugName,
135-
"`group`": group,
136-
"`key`": key,
192+
"`group`": params.Group,
193+
"`key`": params.Key,
137194
}).Cols("value").Update(data)
138195
if err != nil {
139196
return err
@@ -148,76 +205,52 @@ func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error {
148205
return nil
149206
}
150207

151-
func (kv *KVOperator) Del(ctx context.Context, group, key string) error {
152-
if key == "" && group == "" {
208+
// Del removes values from KV storage by group and/or key.
209+
// If both group and key are provided, only that specific entry is deleted.
210+
// If only group is provided, all entries in that group are deleted.
211+
// At least one of group or key must be provided.
212+
func (kv *KVOperator) Del(ctx context.Context, params KVParams) error {
213+
if params.Key == "" && params.Group == "" {
153214
return ErrKVKeyAndGroupEmpty
154215
}
155216

156-
kv.cleanCache(ctx, group, key)
217+
kv.cleanCache(ctx, params)
157218

158219
session, cleanup := kv.getSession(ctx)
159220
defer cleanup()
160221

161222
session.Where(builder.Eq{
162223
"plugin_slug_name": kv.pluginSlugName,
163224
})
164-
if group != "" {
165-
session.Where(builder.Eq{"`group`": group})
225+
if params.Group != "" {
226+
session.Where(builder.Eq{"`group`": params.Group})
166227
}
167-
if key != "" {
168-
session.Where(builder.Eq{"`key`": key})
228+
if params.Key != "" {
229+
session.Where(builder.Eq{"`key`": params.Key})
169230
}
170231

171232
_, err := session.Delete(&entity.PluginKVStorage{})
172233
return err
173234
}
174235

175-
func (kv *KVOperator) cleanCache(ctx context.Context, group, key string) {
176-
if key != "" {
177-
if err := kv.data.Cache.Del(ctx, kv.getCacheKey("", key)); err != nil {
178-
log.Warnf("Failed to delete cache for key %s: %v", key, err)
179-
}
180-
181-
if group != "" {
182-
if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, key)); err != nil {
183-
log.Warnf("Failed to delete cache for group %s, key %s: %v", group, key, err)
184-
}
185-
}
186-
}
187-
188-
if group != "" {
189-
if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, "")); err != nil {
190-
log.Warnf("Failed to delete cache for group %s: %v", group, err)
191-
}
192-
}
193-
}
194-
195-
func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSize int) (map[string]string, error) {
196-
if group == "" {
236+
func (kv *KVOperator) GetByGroup(ctx context.Context, params KVParams) (map[string]string, error) {
237+
if params.Group == "" {
197238
return nil, ErrKVGroupEmpty
198239
}
199240

200-
if page < 1 {
201-
page = 1
241+
if params.Page < 1 {
242+
params.Page = 1
202243
}
203-
if pageSize < 1 {
204-
pageSize = 10
205-
}
206-
207-
cacheKey := kv.getCacheKey(group, "")
208-
if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist {
209-
result := make(map[string]string)
210-
if err := json.Unmarshal([]byte(value), &result); err == nil {
211-
return result, nil
212-
}
244+
if params.PageSize < 1 {
245+
params.PageSize = 10
213246
}
214247

215248
query, cleanup := kv.getSession(ctx)
216249
defer cleanup()
217250

218251
var items []entity.PluginKVStorage
219-
err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": group}).
220-
Limit(pageSize, (page-1)*pageSize).
252+
err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": params.Group}).
253+
Limit(params.PageSize, (params.Page-1)*params.PageSize).
221254
OrderBy("id ASC").
222255
Find(&items)
223256
if err != nil {
@@ -227,13 +260,6 @@ func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSi
227260
result := make(map[string]string, len(items))
228261
for _, item := range items {
229262
result[item.Key] = item.Value
230-
if err := kv.data.Cache.SetString(ctx, kv.getCacheKey(group, item.Key), item.Value, kv.getCacheTTL()); err != nil {
231-
log.Warnf("Failed to set cache for group %s, key %s: %v", group, item.Key, err)
232-
}
233-
}
234-
235-
if resultJSON, err := json.Marshal(result); err == nil {
236-
_ = kv.data.Cache.SetString(ctx, cacheKey, string(resultJSON), kv.getCacheTTL())
237263
}
238264

239265
return result, nil

0 commit comments

Comments
 (0)