Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
gocontext "context"
"fmt"
"math/rand"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/etcddb"
"github.com/aptly-dev/aptly/database/goleveldb"
"github.com/aptly-dev/aptly/database/ssdb"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/files"
"github.com/aptly-dev/aptly/http"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/aptly-dev/aptly/swift"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
"github.com/seefan/gossdb/v2/conf"
"github.com/smira/commander"
"github.com/smira/flag"
)
Expand Down Expand Up @@ -301,6 +305,21 @@ func (context *AptlyContext) _database() (database.Storage, error) {
context.database, err = goleveldb.NewDB(dbPath)
case "etcd":
context.database, err = etcddb.NewDB(context.config().DatabaseBackend.URL)
case "ssdb":
var cfg conf.Config
u, e := url.Parse(context.config().DatabaseBackend.URL)

if e != nil {
return nil, e
}
cfg.Port, e = strconv.Atoi(u.Port())
cfg.Host = strings.Split(u.Host, ":")[0]
if e != nil {
return nil, e
}
password, _ := u.User.Password()
cfg.Password = password
context.database, err = ssdb.NewOpenDB(&cfg)
default:
context.database, err = goleveldb.NewDB(context.dbPath())
}
Expand Down
129 changes: 129 additions & 0 deletions database/ssdb/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package ssdb

import (
"fmt"

"github.com/aptly-dev/aptly/database"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)

const (
delOpt = "del"
)

type bWriteData struct {
key []byte
value []byte
opts string
err error
}

type Batch struct {
cfg *conf.Config
// key-value chan
w chan bWriteData
p map[string]interface{}
d []string
db *pool.Client
}

// func internalOpenBatch...
func internalOpenBatch(_ database.Storage) *Batch {
b := &Batch{
w: make(chan bWriteData),
p: make(map[string]interface{}),
}
b.run()

return b
}

func (b *Batch) run() {
go func() {
for {
select {
case w, ok := <-b.w:
{
if !ok {
ssdbLog("ssdb batch write chan closed")
return
}

if w.opts == "write" {
ssdbLog("ssdb batch write")
var err error
if len(b.p) > 0 && len(b.d) == 0 {
err = b.db.MultiSet(b.p)
ssdbLog("ssdb batch set errinfo: ", err)
} else if len(b.d) > 0 && len(b.p) == 0 {
err = b.db.MultiDel(b.d...)
ssdbLog("ssdb batch del errinfo: ", err)
} else if len(b.p) == 0 && len(b.d) == 0 {
err = nil
} else {
err = fmt.Errorf("ssdb batch does not support both put and delete operations")
}
ssdbLog("ssdb batch write errinfo: ", err)
b.w <- bWriteData{
err: err,
}
ssdbLog("ssdb batch write end")
} else {
ssdbLog("ssdb batch", w.opts)
if w.opts == "put" {
b.p[string(w.key)] = w.value
} else if w.opts == delOpt {
b.d = append(b.d, string(w.key))
}
}
}
}
}
}()
}

func (b *Batch) stop() {
ssdbLog("ssdb batch stop")
close(b.w)
}

func (b *Batch) Put(key, value []byte) (err error) {
// err = b.db.Set(string(key), string(value))
w := bWriteData{
key: key,
value: value,
opts: "put",
}

b.w <- w
return nil
}

func (b *Batch) Delete(key []byte) (err error) {
/* err = b.db.Del(string(key))
return */
w := bWriteData{
key: key,
opts: delOpt,
}

b.w <- w
return nil
}

func (b *Batch) Write() (err error) {
defer b.stop()
w := bWriteData{
opts: "write",
}

b.w <- w
result := <-b.w
return result.err
}

// batch should implement database.Batch
var (
_ database.Batch = &Batch{}
)
62 changes: 62 additions & 0 deletions database/ssdb/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ssdb

import (
"os"
"strconv"

"github.com/aptly-dev/aptly/database"
"github.com/seefan/gossdb/v2"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)

var defaultBufSize = 102400
var defaultPoolSize = 1

func internalOpen(cfg *conf.Config) (*pool.Client, error) {
ssdbLog("internalOpen")

cfg.ReadBufferSize = defaultBufSize
cfg.WriteBufferSize = defaultBufSize
cfg.MaxPoolSize = defaultPoolSize
cfg.PoolSize = defaultPoolSize
cfg.MinPoolSize = defaultPoolSize
cfg.MaxWaitSize = 100 * defaultPoolSize
cfg.RetryEnabled = true

//override by env
if os.Getenv("SSDB_READBUFFERSIZE") != "" {
readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE"))
if err != nil {
cfg.ReadBufferSize = readBufSize
}
}

if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" {
writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE"))
if err != nil {
cfg.WriteBufferSize = writeBufSize
}
}

var cfgs = []*conf.Config{cfg}
err := gossdb.Start(cfgs...)
if err != nil {
return nil, err
}

return gossdb.NewClient()
}

func NewDB(cfg *conf.Config) (database.Storage, error) {
return &Storage{cfg: cfg}, nil
}

func NewOpenDB(cfg *conf.Config) (database.Storage, error) {
db, err := NewDB(cfg)
if err != nil {
return nil, err
}

return db, db.Open()
}
Loading
Loading