Skip to content

Commit

Permalink
feat(storers): Use UUID for memcached keys
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Jordan committed Apr 24, 2024
1 parent 05bd507 commit 086ede5
Showing 1 changed file with 120 additions and 54 deletions.
174 changes: 120 additions & 54 deletions pkg/storage/nutsMemcachedProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"net/http"
"strconv"
"strings"
"time"

"github.com/bradfitz/gomemcache/memcache"
t "github.com/darkweak/souin/configurationtypes"
"github.com/darkweak/souin/pkg/rfc"
"github.com/darkweak/souin/pkg/storage/types"
"github.com/dgraph-io/ristretto"
"github.com/imdario/mergo"
"github.com/nutsdb/nutsdb"
"go.uber.org/zap"
Expand All @@ -22,9 +24,10 @@ var nutsMemcachedInstanceMap = map[string]*nutsdb.DB{}
// NutsMemcached provider type
type NutsMemcached struct {
*nutsdb.DB
stale time.Duration
logger *zap.Logger
memcacheClient *memcache.Client
stale time.Duration
logger *zap.Logger
//memcacheClient *memcache.Client
ristrettoCache *ristretto.Cache
}

// const (
Expand Down Expand Up @@ -82,6 +85,7 @@ func NutsMemcachedConnectionFactory(c t.AbstractConfigurationInterface) (types.S
nutsOptions.EntryIdxMode = nutsdb.HintKeyAndRAMIdxMode
// `HintBPTSparseIdxMode` represents b+ tree sparse index mode.
// Note: this mode was removed after v0.14.0
// Use: github.com/nutsdb/nutsdb v0.14.0
//nutsOptions.EntryIdxMode = nutsdb.HintBPTSparseIdxMode

if nutsConfiguration.Configuration != nil {
Expand Down Expand Up @@ -118,11 +122,35 @@ func NutsMemcachedConnectionFactory(c t.AbstractConfigurationInterface) (types.S
return nil, e
}

var numCounters int64 = 1e7 // number of keys to track frequency of (10M).
var maxCost int64 = 1 << 30 // maximum cost of cache (1GB).
if nutsConfiguration.Configuration != nil {
rawNumCounters, ok := nutsConfiguration.Configuration.(map[string]interface{})["NumCounters"]
if ok {
numCounters, _ = strconv.ParseInt(rawNumCounters.(string), 10, 64)
}

rawMaxCost, ok := nutsConfiguration.Configuration.(map[string]interface{})["MaxCost"]
if ok {
maxCost, _ = strconv.ParseInt(rawMaxCost.(string), 10, 64)
}
}
// See https://github.com/dgraph-io/ristretto?tab=readme-ov-file#example
ristrettoCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: numCounters,
MaxCost: maxCost,
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
}

instance := &NutsMemcached{
DB: db,
stale: dc.GetStale(),
logger: c.GetLogger(),
memcacheClient: memcache.New("127.0.0.1:11211"), // hardcoded for now
DB: db,
stale: dc.GetStale(),
logger: c.GetLogger(),
//memcacheClient: memcache.New("127.0.0.1:11211"), // hardcoded for now
ristrettoCache: ristrettoCache,
}
nutsMemcachedInstanceMap[nutsOptions.Dir] = instance.DB

Expand Down Expand Up @@ -179,29 +207,10 @@ func (provider *NutsMemcached) MapKeys(prefix string) map[string]string {

// Get method returns the populated response if exists, empty response then
func (provider *NutsMemcached) Get(key string) (item []byte) {
// get from nuts
keyFound := false
{
_ = provider.DB.View(func(tx *nutsdb.Tx) error {
i, e := tx.Get(bucket, []byte(key))
if i != nil {
// Value is stored in memcached
//item = i.Value
keyFound = true
}
return e
})
}

// get from memcached
if keyFound {
// Reminder: the key must be at most 250 bytes in length
//fmt.Println("memcached GET", key)
i, e := provider.memcacheClient.Get(key)
if e == nil && i != nil {
item = i.Value
}
memcachedKey, _ := provider.getFromNuts(key)

if memcachedKey != "" {
item, _ = provider.getFromMemcached(memcachedKey)
}

return
Expand All @@ -220,14 +229,14 @@ func (provider *NutsMemcached) Prefix(key string, req *http.Request, validator *
for _, entry := range entries {
if varyVoter(key, req, string(entry.Key)) {
// TODO: improve this
// store header only in nuts and avoid query to memcached on each vary
// Store only response header in nuts and avoid query to memcached on each vary
// E.g, rfc.ValidateETag on NutsDB header value, retrieve response body later from memcached.

// Reminder: the key must be at most 250 bytes in length
//fmt.Println("memcached PREFIX", key, "GET", string(entry.Key))
i, e := provider.memcacheClient.Get(string(entry.Key))
if e == nil && i != nil {
res, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(i.Value)), req)
i, e := provider.getFromMemcached(string(entry.Value))
if e == nil {
res, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(i)), req)
if err == nil {
rfc.ValidateETag(res, validator)
if validator.Matched {
Expand Down Expand Up @@ -257,12 +266,16 @@ func (provider *NutsMemcached) Set(key string, value []byte, url t.URL, ttl time
if ttl == 0 {
ttl = url.TTL.Duration
}
// Only for memcached (to overcome 250 bytes key limit)
//memcachedKey := uuid.New().String()
memcachedKey := key

// set to nuts (normal TTL)
{
err := provider.DB.Update(func(tx *nutsdb.Tx) error {
// No value is stored, value is stored in memcached
return tx.Put(bucket, []byte(key), []byte{}, uint32(ttl.Seconds()))

// key: cache-key, value: memcached-key
return tx.Put(bucket, []byte(key), []byte(memcachedKey), uint32(ttl.Seconds()))
})

if err != nil {
Expand All @@ -275,8 +288,8 @@ func (provider *NutsMemcached) Set(key string, value []byte, url t.URL, ttl time
staleTtl := int32((provider.stale + ttl).Seconds())
{
err := provider.DB.Update(func(tx *nutsdb.Tx) error {
// No value is stored, value is stored in memcached
return tx.Put(bucket, []byte(StalePrefix+key), []byte{}, uint32(staleTtl))
// key: "STALE_" + cache-key, value: memcached-key
return tx.Put(bucket, []byte(StalePrefix+key), []byte(memcachedKey), uint32(staleTtl))
})

if err != nil {
Expand All @@ -285,38 +298,36 @@ func (provider *NutsMemcached) Set(key string, value []byte, url t.URL, ttl time
}

// set to memcached with stale TTL
{
// Reminder: the key must be at most 250 bytes in length
//fmt.Println("memcached SET", key)
err := provider.memcacheClient.Set(
&memcache.Item{
Key: key,
Value: value,
Expiration: staleTtl,
},
)
if err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into Memcached, %v", err)
}
}
_ = provider.setToMemcached(memcachedKey, value, staleTtl)

return nil
}

// Delete method will delete the response in Nuts provider if exists corresponding to key param
func (provider *NutsMemcached) Delete(key string) {
memcachedKey, _ := provider.getFromNuts(key)

// delete from memcached
if memcachedKey != "" {
_ = provider.delFromMemcached(memcachedKey)
}

// delete from nuts
_ = provider.DB.Update(func(tx *nutsdb.Tx) error {
return tx.Delete(bucket, []byte(key))
})
}

// DeleteMany method will delete the responses in Nuts provider if exists corresponding to the regex key param
func (provider *NutsMemcached) DeleteMany(key string) {
func (provider *NutsMemcached) DeleteMany(keyReg string) {
_ = provider.DB.Update(func(tx *nutsdb.Tx) error {
if entries, err := tx.PrefixSearchScan(bucket, []byte(""), key, 0, nutsLimit); err != nil {
if entries, err := tx.PrefixSearchScan(bucket, []byte(""), keyReg, 0, nutsLimit); err != nil {
return err
} else {
for _, entry := range entries {
// delete from memcached
_ = provider.delFromMemcached(string(entry.Value))
// delete from nuts
_ = tx.Delete(bucket, entry.Key)
}
}
Expand All @@ -335,3 +346,58 @@ func (provider *NutsMemcached) Reset() error {
return tx.DeleteBucket(1, bucket)
})
}

func (provider *NutsMemcached) getFromNuts(nutsKey string) (memcachedKey string, err error) {
err = provider.DB.View(func(tx *nutsdb.Tx) error {
i, e := tx.Get(bucket, []byte(nutsKey))
if i != nil {
memcachedKey = string(i.Value)
}
return e
})
return
}

// Reminder: the memcachedKey must be at most 250 bytes in length
func (provider *NutsMemcached) setToMemcached(memcachedKey string, value []byte, ttl int32) (err error) {
//fmt.Println("memcached SET", key)
// err = provider.memcacheClient.Set(
// &memcache.Item{
// Key: memcachedKey,
// Value: value,
// Expiration: ttl,
// },
// )
//if err != nil {
// provider.logger.Sugar().Errorf("Failed to set into memcached, %v", err)
// }
ok := provider.ristrettoCache.Set(memcachedKey, value, int64(len(value)))
if !ok {
provider.logger.Sugar().Debugf("Value not set to cache, key=%v", memcachedKey)
}
return
}

// Reminder: the memcachedKey must be at most 250 bytes in length
func (provider *NutsMemcached) getFromMemcached(memcachedKey string) (value []byte, err error) {
//fmt.Println("memcached GET", key)
// i, err := provider.memcacheClient.Get(memcachedKey)
// if err == nil && i != nil {
// value = i.Value
// } else {
// provider.logger.Sugar().Errorf("Failed to get from memcached, %v", err)
// }
rawValue, found := provider.ristrettoCache.Get(memcachedKey)
if !found {
provider.logger.Sugar().Debugf("Failed to get from cache, key=%v", memcachedKey)
return nil, errors.New("failed to get from cache")
}
value = rawValue.([]byte)
return
}

func (provider *NutsMemcached) delFromMemcached(memcachedKey string) (err error) {
//err = provider.memcacheClient.Delete(memcachedKey)
provider.ristrettoCache.Del(memcachedKey)
return
}

0 comments on commit 086ede5

Please sign in to comment.