1
0
Fork 0
mirror of https://github.com/anyproto/anytype-heart.git synced 2025-06-10 18:10:49 +09:00

migrate to badger v3

This commit is contained in:
Roman Khafizianov 2022-04-11 19:07:23 +02:00
parent 2548dba949
commit f457ef6616
No known key found for this signature in database
GPG key ID: F07A7D55A2684852
7 changed files with 436 additions and 69 deletions

View file

@ -11,7 +11,6 @@ import (
"github.com/anytypeio/go-anytype-middleware/util/pbtypes"
dsbadgerv3 "github.com/anytypeio/go-ds-badger3"
"github.com/gogo/protobuf/types"
dsbadgerv1 "github.com/ipfs/go-ds-badger"
"math/rand"
"os"
"path/filepath"
@ -67,10 +66,10 @@ func initBadgerV3(o *options) (*dsbadgerv3.Datastore, error) {
return localstoreDS, nil
}
func initBadgerV1(o *options) (*dsbadgerv1.Datastore, error) {
cfg := clientds.DefaultConfig.Logstore
func initBadgerV1(o *options) (*dsbadgerv3.Datastore, error) {
cfg := clientds.DefaultConfig.Localstore
cfg.SyncWrites = o.sync
localstoreDS, err := dsbadgerv1.NewDatastore(filepath.Join(o.path, localstoreDir), &cfg)
localstoreDS, err := dsbadgerv3.NewDatastore(filepath.Join(o.path, localstoreDir), &cfg)
if err != nil {
return nil, err
}

View file

@ -87,20 +87,19 @@ func TestFile(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(1024*1024*3), f.Meta().Size)
}
m, err := getMetrics(filepath.Join(rootPath, coreService.Account(), "ipfslite"))
m, err := getMetrics(filepath.Join(rootPath, coreService.Account(), "ipfslite_v3"))
require.NoError(t, err)
require.Equal(t, 10, m.NumVLOG)
fmt.Printf("BADGER METRICS AFTER ADD: %+v\n", m)
resp := mw.FileListOffload(&pb.RpcFileListOffloadRequest{IncludeNotPinned: true})
require.Equal(t, 0, int(resp.Error.Code), resp.Error.Description)
require.Equal(t, int32(200), resp.FilesOffloaded)
require.Equal(t, int32(201), resp.FilesOffloaded)
require.Equal(t, uint64(1024*1024*3*200+247400), resp.BytesOffloaded) // 247400 is the overhead for the links and meta
m, err = getMetrics(filepath.Join(rootPath, coreService.Account(), "ipfslite"))
m, err = getMetrics(filepath.Join(rootPath, coreService.Account(), "ipfslite_v3"))
require.NoError(t, err)
fmt.Printf("BADGER METRICS AFTER OFFLOAD: %+v\n", m)
require.LessOrEqual(t, m.NumVLOG, 3)
})
t.Run("image_should_open_as_object", func(t *testing.T) {
respUploadImage := mw.UploadFile(&pb.RpcUploadFileRequest{LocalPath: "./block/testdata/testdir/a.jpg"})

9
go.mod
View file

@ -6,7 +6,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.0 // indirect
github.com/JohannesKaufmann/html-to-markdown v0.0.0-00010101000000-000000000000
github.com/PuerkitoBio/goquery v1.7.1
github.com/anytypeio/go-ds-badger3 v0.2.8-0.20211208232951-e452447f05c6
github.com/anytypeio/go-ds-badger3 v0.2.8-0.20220408163826-7a36f3a0425d
github.com/anytypeio/go-slip10 v0.0.0-20200330112030-a352ca8495e4
github.com/anytypeio/go-slip21 v0.0.0-20200218204727-e2e51e20ab51
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
@ -24,9 +24,10 @@ require (
github.com/gobwas/glob v0.2.3
github.com/goccy/go-graphviz v0.0.9
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/gosimple/slug v1.12.0 // indirect
github.com/gosimple/slug v1.12.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
@ -49,6 +50,7 @@ require (
github.com/ipfs/go-path v0.0.7
github.com/ipfs/go-unixfs v0.2.5
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/jbenet/goprocess v0.1.4
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-libp2p v0.14.3
@ -84,7 +86,6 @@ require (
github.com/tyler-smith/go-bip39 v1.0.1-0.20190808214741-c55f737395bc
github.com/uber/jaeger-client-go v2.28.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
github.com/gogo/status v1.1.0
github.com/yuin/goldmark v1.4.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
@ -102,6 +103,8 @@ require (
nhooyr.io/websocket v1.8.7 // indirect
)
replace github.com/dgraph-io/badger/v3 => github.com/anytypeio/badger/v3 v3.2103.3-0.20220408162140-5fddf59954d6
replace github.com/JohannesKaufmann/html-to-markdown => github.com/anytypeio/html-to-markdown v0.0.0-20200617145221-2afd2a14bae1
replace github.com/textileio/go-threads => github.com/anytypeio/go-threads v1.1.0-rc1.0.20220223104843-a67245cee80e

12
go.sum
View file

@ -79,10 +79,10 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/anytypeio/amplitude-go v0.0.0-20211130222238-8d16496a9b31 h1:fCXBjRAxXq4pKyfH8xmYPfjL/E6v5zTTxYrIKDz8ufw=
github.com/anytypeio/amplitude-go v0.0.0-20211130222238-8d16496a9b31/go.mod h1:uX6FcwR+wTQWzFszLXQxeKfNeittg1408V7pVAtLKqQ=
github.com/anytypeio/badger/v3 v3.2103.3-0.20211208214531-90d37368c220 h1:Re2YR8w2xLuQisVfEqLQP2aqAbKsVUnsgBCWcJXfQeY=
github.com/anytypeio/badger/v3 v3.2103.3-0.20211208214531-90d37368c220/go.mod h1:/7Bz+MdP5VAvsg5XmFieHsCgj9P689gBq8ho0Cv/WiE=
github.com/anytypeio/go-ds-badger3 v0.2.8-0.20211208232951-e452447f05c6 h1:XJyExPcF1Zq4Fx5vi7/g8EdMfBKKIJXzUSbfDUSyC/I=
github.com/anytypeio/go-ds-badger3 v0.2.8-0.20211208232951-e452447f05c6/go.mod h1:Xv0XbZm3cAt74F2bz0/XrrpJpsDnkBJ4TM8gpim6oH4=
github.com/anytypeio/badger/v3 v3.2103.3-0.20220408162140-5fddf59954d6 h1:HabDG7yUniImgiPLwb32foO8e9zjwXWszdJK8+j+rBI=
github.com/anytypeio/badger/v3 v3.2103.3-0.20220408162140-5fddf59954d6/go.mod h1:j6qVNdAoWIfOaRaFkYHU8dc2OB2fBWcUKJwt3OxIIRs=
github.com/anytypeio/go-ds-badger3 v0.2.8-0.20220408163826-7a36f3a0425d h1:USIASm5/+Ys56mfDO36oujdzeojbsRk+5aqTQ1/JQS8=
github.com/anytypeio/go-ds-badger3 v0.2.8-0.20220408163826-7a36f3a0425d/go.mod h1:2jehJ1Tq/vu6RM0iPAkvPNr5ZDE9ScM2ZUfT+XXAZYw=
github.com/anytypeio/go-gelf v0.0.0-20210418191311-774bd5b016e7 h1:YBmcug6mOdwjJcmzA/Fpjtl2oo78+fMlr5Xj0dzwg78=
github.com/anytypeio/go-gelf v0.0.0-20210418191311-774bd5b016e7/go.mod h1:EuPAem8b51iulSY3wBQJc0Shz+A2DmyREddzDt0fywM=
github.com/anytypeio/go-log/v2 v2.1.2-0.20200810212702-264b187bb04f h1:aeyycLTPbhwjW2x/EkmuAgRzdX0bdwy+RStmevhQbhM=
@ -238,10 +238,7 @@ github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6ps
github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU=
github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8=
github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE=
github.com/dgraph-io/badger/v3 v3.2103.2 h1:dpyM5eCJAtQCBcMCZcT4UBZchuTJgCywerHHgmxfxM8=
github.com/dgraph-io/badger/v3 v3.2103.2/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f h1:NBGp2JpfMtXmanFWt6f3gEdBtnLO5LupRvm3w4TXrvs=
github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
@ -1607,7 +1604,6 @@ golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPI
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=

View file

@ -3,6 +3,7 @@ package clientds
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-middleware/pkg/lib/datastore/multids"
"os"
"path/filepath"
"sort"
@ -27,11 +28,12 @@ import (
)
const (
CName = "datastore"
liteDSDir = "ipfslite"
logstoreDSDir = "logstore"
localstoreDSDir = "localstore"
threadsDbDSDir = "collection" + string(os.PathSeparator) + "eventstore"
CName = "datastore"
liteOldDSDir = "ipfslite" // used as a fallback for the existing repos
liteDSDir = "ipfslite_v3"
logstoreOldDSDir = "logstore" // used for migration to the localstoreDSDir and then removed
localstoreDSDir = "localstore"
threadsDbDSDir = "collection" + string(os.PathSeparator) + "eventstore"
valueLogExtenderKey = "_extend"
valueLogExtenderSize = 1024
@ -40,28 +42,34 @@ const (
var log = logging.Logger("anytype-clientds")
type clientds struct {
running bool
litestoreDS *dsbadgerv1.Datastore
logstoreDS *dsbadgerv1.Datastore
localstoreDS *dsbadgerv3.Datastore
threadsDbDS *textileBadger.Datastore
cfg Config
repoPath string
migrations []migration
running bool
litestoreOldDS *dsbadgerv1.Datastore
litestoreDS *dsbadgerv3.Datastore
litestoreCombinedDS ds.Batching
logstoreOldDS *dsbadgerv1.Datastore // logstore moved to localstoreDS
localstoreDS *dsbadgerv3.Datastore
threadsDbDS *textileBadger.Datastore
cfg Config
repoPath string
migrations []migration
}
type Config struct {
Litestore dsbadgerv1.Options
Logstore dsbadgerv1.Options
Litestore dsbadgerv3.Options
LitestoreOld dsbadgerv1.Options
LogstoreOld dsbadgerv1.Options
Localstore dsbadgerv3.Options
TextileDb dsbadgerv1.Options
}
var DefaultConfig = Config{
Litestore: dsbadgerv1.DefaultOptions,
Logstore: dsbadgerv1.DefaultOptions,
TextileDb: dsbadgerv1.DefaultOptions,
Localstore: dsbadgerv3.DefaultOptions,
Litestore: dsbadgerv3.DefaultOptions,
LitestoreOld: dsbadgerv1.DefaultOptions,
LogstoreOld: dsbadgerv1.DefaultOptions,
TextileDb: dsbadgerv1.DefaultOptions,
Localstore: dsbadgerv3.DefaultOptions,
}
type DSConfigGetter interface {
@ -74,14 +82,16 @@ type migration struct {
}
func init() {
// lets set badger options inside the init, otherwise we need to directly import the badger intp MW
DefaultConfig.Logstore.ValueLogFileSize = 64 * 1024 * 1024 // Badger will rotate value log files after 64MB. GC only works starting from the 2nd value log file
DefaultConfig.Logstore.GcDiscardRatio = 0.2 // allow up to 20% value log overhead
DefaultConfig.Logstore.GcInterval = time.Minute * 10 // run GC every 10 minutes
DefaultConfig.Logstore.GcSleep = time.Second * 5 // sleep between rounds of one GC cycle(it has multiple rounds within one cycle)
DefaultConfig.Logstore.ValueThreshold = 1024 // store up to 1KB of value within the LSM tree itself to speed-up details filter queries
DefaultConfig.Logstore.Logger = logging.Logger("badger-logstore")
// lets set badger options inside the init, otherwise we need to directly import the badger intp MW
DefaultConfig.LogstoreOld.ValueLogFileSize = 64 * 1024 * 1024 // Badger will rotate value log files after 64MB. GC only works starting from the 2nd value log file
DefaultConfig.LogstoreOld.GcDiscardRatio = 0.2 // allow up to 20% value log overhead
DefaultConfig.LogstoreOld.GcInterval = time.Minute * 10 // run GC every 10 minutes
DefaultConfig.LogstoreOld.GcSleep = time.Second * 5 // sleep between rounds of one GC cycle(it has multiple rounds within one cycle)
DefaultConfig.LogstoreOld.ValueThreshold = 1024 // store up to 1KB of value within the LSM tree itself to speed-up details filter queries
DefaultConfig.LogstoreOld.Logger = logging.Logger("badger-logstore-old")
// used to store objects localstore + threads logs info
DefaultConfig.Localstore.MemTableSize = 16 * 1024 * 1024 // Memtable saves all values below value threshold + write ahead log, actual file size is 2x the amount, the size is preallocated
DefaultConfig.Localstore.ValueLogFileSize = 16 * 1024 * 1024 // Vlog has all values more than value threshold, actual file uses 2x the amount, the size is preallocated
DefaultConfig.Localstore.GcDiscardRatio = 0.2 // allow up to 20% value log overhead
@ -91,9 +101,19 @@ func init() {
DefaultConfig.Localstore.Logger = logging.Logger("badger-localstore")
DefaultConfig.Localstore.SyncWrites = false
DefaultConfig.Litestore.MemTableSize = 64 * 1024 * 1024 // Memtable saves all values below value threshold + write ahead log, actual file size is 2x the amount, the size is preallocated
DefaultConfig.Litestore.ValueLogFileSize = 64 * 1024 * 1024 // Vlog has all values more than value threshold, actual file uses 2x the amount, the size is preallocated
DefaultConfig.Litestore.GcDiscardRatio = 0.2 // allow up to 20% value log overhead
DefaultConfig.Litestore.GcInterval = time.Minute * 10 // run GC every 10 minutes
DefaultConfig.Litestore.GcSleep = time.Second * 5 // sleep between rounds of one GC cycle(it has multiple rounds within one cycle)
DefaultConfig.Litestore.ValueThreshold = 1024
DefaultConfig.Litestore.Logger = logging.Logger("badger-litestore")
DefaultConfig.Litestore.ValueLogFileSize = 64 * 1024 * 1024
DefaultConfig.Litestore.GcDiscardRatio = 0.1
DefaultConfig.Litestore.SyncWrites = false
DefaultConfig.LitestoreOld.Logger = logging.Logger("badger-litestore-old")
DefaultConfig.LitestoreOld.ValueLogFileSize = 64 * 1024 * 1024
DefaultConfig.LitestoreOld.GcDiscardRatio = 0.1
DefaultConfig.TextileDb.Logger = logging.Logger("badger-textiledb")
// we don't need to tune litestore&threadsDB badger instances because they should be fine with defaults for now
}
@ -120,20 +140,38 @@ func (r *clientds) Init(a *app.App) (err error) {
migrationFunc: r.migrateFileStoreAndIndexesBadger,
migrationKey: ds.NewKey("/migration/localstore/badgerv3/filesindexes"),
},
{
migrationFunc: r.migrateLogstore,
migrationKey: ds.NewKey("/migration/logstore/badgerv3"),
},
}
return nil
}
func (r *clientds) Run() error {
var err error
r.litestoreDS, err = dsbadgerv1.NewDatastore(filepath.Join(r.repoPath, liteDSDir), &r.cfg.Litestore)
r.litestoreDS, err = dsbadgerv3.NewDatastore(filepath.Join(r.repoPath, liteDSDir), &r.cfg.Litestore)
if err != nil {
return err
}
r.logstoreDS, err = dsbadgerv1.NewDatastore(filepath.Join(r.repoPath, logstoreDSDir), &r.cfg.Logstore)
if err != nil {
return err
litestoreOldPath := filepath.Join(r.repoPath, liteOldDSDir)
if _, err := os.Stat(litestoreOldPath); !os.IsNotExist(err) {
r.litestoreOldDS, err = dsbadgerv1.NewDatastore(litestoreOldPath, &r.cfg.LitestoreOld)
if err != nil {
return err
}
r.litestoreCombinedDS = multids.New(r.litestoreDS, r.litestoreOldDS)
}
logstoreOldDSDirPath := filepath.Join(r.repoPath, logstoreOldDSDir)
if _, err := os.Stat(logstoreOldDSDirPath); !os.IsNotExist(err) {
r.logstoreOldDS, err = dsbadgerv1.NewDatastore(logstoreOldDSDirPath, &r.cfg.LogstoreOld)
if err != nil {
return err
}
}
r.localstoreDS, err = dsbadgerv3.NewDatastore(filepath.Join(r.repoPath, localstoreDSDir), &r.cfg.Localstore)
@ -185,11 +223,14 @@ func (r *clientds) migrateIfNeeded() error {
return nil
}
func (r *clientds) migrateWithKey(chooseKey func(item *dgraphbadgerv1.Item) bool) error {
s := r.logstoreDS.DB.NewStream()
func (r *clientds) migrateWithKey(from *dsbadgerv1.Datastore, to *dsbadgerv3.Datastore, chooseKey func(item *dgraphbadgerv1.Item) bool) error {
if from == nil {
return fmt.Errorf("from ds is nil")
}
s := from.DB.NewStream()
s.ChooseKey = chooseKey
s.Send = func(list *dgraphbadgerv1pb.KVList) error {
batch, err := r.localstoreDS.Batch()
batch, err := to.Batch()
if err != nil {
return err
}
@ -205,7 +246,10 @@ func (r *clientds) migrateWithKey(chooseKey func(item *dgraphbadgerv1.Item) bool
}
func (r *clientds) migrateLocalStoreBadger() error {
return r.migrateWithKey(func(item *dgraphbadgerv1.Item) bool {
if r.logstoreOldDS == nil {
return nil
}
return r.migrateWithKey(r.logstoreOldDS, r.localstoreDS, func(item *dgraphbadgerv1.Item) bool {
keyString := string(item.Key())
res := strings.HasPrefix(keyString, "/pages") ||
strings.HasPrefix(keyString, "/workspaces") ||
@ -215,20 +259,53 @@ func (r *clientds) migrateLocalStoreBadger() error {
}
func (r *clientds) migrateFileStoreAndIndexesBadger() error {
return r.migrateWithKey(func(item *dgraphbadgerv1.Item) bool {
if r.logstoreOldDS == nil {
return nil
}
return r.migrateWithKey(r.logstoreOldDS, r.localstoreDS, func(item *dgraphbadgerv1.Item) bool {
keyString := string(item.Key())
return strings.HasPrefix(keyString, "/files") || strings.HasPrefix(keyString, "/idx")
})
}
func (r *clientds) migrateLogstore() error {
if r.logstoreOldDS == nil {
return nil
}
return r.migrateWithKey(r.logstoreOldDS, r.localstoreDS, func(item *dgraphbadgerv1.Item) bool {
keyString := string(item.Key())
return strings.HasPrefix(keyString, "/thread")
})
}
type ValueLogInfo struct {
Index int64
Size int64
}
func (r *clientds) RunBlockstoreGC() (freed int64, err error) {
if r.litestoreOldDS != nil {
freed1, err := runBlockstoreGC(filepath.Join(r.repoPath, liteOldDSDir), r.litestoreOldDS, DefaultConfig.LitestoreOld.ValueLogFileSize)
if err != nil {
return 0, err
}
freed += freed1
}
if r.litestoreDS != nil {
freed2, err := runBlockstoreGC(filepath.Join(r.repoPath, liteDSDir), r.litestoreDS, DefaultConfig.Litestore.ValueLogFileSize)
if err != nil {
return 0, err
}
freed += freed2
}
return
}
func runBlockstoreGC(dsPath string, dsInstance ds.Datastore, valueLogSize int64) (freed int64, err error) {
getValueLogsInfo := func() (totalSize int64, valLogs []*ValueLogInfo, err error) {
err = filepath.Walk(filepath.Join(r.repoPath, liteDSDir), func(_ string, info os.FileInfo, err error) error {
err = filepath.Walk(dsPath, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
@ -264,17 +341,23 @@ func (r *clientds) RunBlockstoreGC() (freed int64, err error) {
return 0, nil
}
if valLogs[len(valLogs)-1].Size > DefaultConfig.Litestore.ValueLogFileSize {
if valLogs[len(valLogs)-1].Size > valueLogSize {
// in case we have the last value log exceeding the max value log size
v := make([]byte, valueLogExtenderSize)
r.litestoreDS.Put(ds.NewKey(valueLogExtenderKey), v)
_ = dsInstance.Put(ds.NewKey(valueLogExtenderKey), v)
}
var total int
var maxErrors = 1
for {
if v1, ok := dsInstance.(*dsbadgerv1.Datastore); ok {
err = v1.DB.RunValueLogGC(0.000000000001)
} else if v3, ok := dsInstance.(*dsbadgerv3.Datastore); ok {
err = v3.DB.RunValueLogGC(0.000000000001)
} else {
panic("badger version unsupported")
}
// set the discard ratio to the lowest value means we want to rewrite value log if we have any values removed
err = r.litestoreDS.DB.RunValueLogGC(0.000000000001)
if err != nil && err.Error() == "Value log GC attempt didn't result in any cleanup" {
maxErrors--
if maxErrors == 0 {
@ -288,7 +371,7 @@ func (r *clientds) RunBlockstoreGC() (freed int64, err error) {
totalSizeAfter, vlogsAfter, err := getValueLogsInfo()
results, err := r.litestoreDS.Query(query.Query{Limit: 0, KeysOnly: true, ReturnsSizes: true})
results, err := dsInstance.Query(query.Query{Limit: 0, KeysOnly: true, ReturnsSizes: true})
var (
keysTotal int64
keysTotalSize int64
@ -313,6 +396,11 @@ func (r *clientds) PeerstoreDS() (ds.Batching, error) {
if !r.running {
return nil, fmt.Errorf("exact ds may be requested only after Run")
}
if r.litestoreCombinedDS != nil {
return r.litestoreCombinedDS, nil
}
return r.litestoreDS, nil
}
@ -320,6 +408,11 @@ func (r *clientds) BlockstoreDS() (ds.Batching, error) {
if !r.running {
return nil, fmt.Errorf("exact ds may be requested only after Run")
}
if r.litestoreCombinedDS != nil {
return r.litestoreCombinedDS, nil
}
return r.litestoreDS, nil
}
@ -327,7 +420,7 @@ func (r *clientds) LogstoreDS() (datastore.DSTxnBatching, error) {
if !r.running {
return nil, fmt.Errorf("exact ds may be requested only after Run")
}
return r.logstoreDS, nil
return r.localstoreDS, nil
}
func (r *clientds) ThreadsDbDS() (keytransform.TxnDatastoreExtended, error) {
@ -349,13 +442,6 @@ func (r *clientds) Name() (name string) {
}
func (r *clientds) Close() (err error) {
if r.logstoreDS != nil {
err2 := r.logstoreDS.Close()
if err2 != nil {
err = multierror.Append(err, err2)
}
}
if r.litestoreDS != nil {
err2 := r.litestoreDS.Close()
if err2 != nil {

View file

@ -0,0 +1,139 @@
package multids
import (
"github.com/hashicorp/go-multierror"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)
type multiDs struct {
oldDs ds.Batching
newDs ds.Batching
}
type multiBatch struct {
oldDs ds.Batch
newDs ds.Batch
}
func (d multiDs) Get(key ds.Key) (value []byte, err error) {
s, err := d.newDs.Get(key)
if err == ds.ErrNotFound {
return d.oldDs.Get(key)
}
return s, err
}
func (d multiDs) Has(key ds.Key) (exists bool, err error) {
if exists, err = d.newDs.Has(key); err != nil || exists {
return exists, err
} else {
return d.oldDs.Has(key)
}
}
func (d multiDs) GetSize(key ds.Key) (size int, err error) {
if s, err := d.newDs.GetSize(key); err == ds.ErrNotFound {
return d.oldDs.GetSize(key)
} else {
return s, err
}
}
func (d multiDs) Query(q query.Query) (query.Results, error) {
res1, err := d.newDs.Query(q)
if err != nil {
return nil, err
}
res2, err := d.oldDs.Query(q)
if err != nil {
return nil, err
}
return newResultsCombiner(res1, res2), nil
}
func (d multiDs) Put(key ds.Key, value []byte) error {
return d.newDs.Put(key, value)
}
func (d multiDs) Delete(key ds.Key) error {
if err := d.newDs.Delete(key); err == ds.ErrNotFound {
return d.oldDs.Delete(key)
} else {
return err
}
}
func (d multiDs) Sync(prefix ds.Key) error {
err := d.newDs.Sync(prefix)
if err != nil {
return err
}
return d.oldDs.Sync(prefix)
}
func (d multiDs) Close() error {
err1 := d.oldDs.Close()
err := d.newDs.Close()
if err != nil {
return err
}
return err1
}
func (d multiDs) Batch() (ds.Batch, error) {
oldBatch, err := d.oldDs.Batch()
if err != nil {
return nil, err
}
newBatch, err := d.newDs.Batch()
if err != nil {
return nil, err
}
return newMultiBatch(newBatch, oldBatch), nil
}
func New(newDs ds.Batching, oldDs ds.Batching) ds.Batching {
return &multiDs{oldDs: oldDs, newDs: newDs}
}
func newMultiBatch(newDs ds.Batch, oldDs ds.Batch) ds.Batch {
return &multiBatch{oldDs: oldDs, newDs: newDs}
}
func (m multiBatch) Put(key ds.Key, value []byte) error {
// put only to the new ds
return m.newDs.Put(key, value)
}
func (m multiBatch) Delete(key ds.Key) error {
var err error
err = m.oldDs.Delete(key)
if err != nil {
return err
}
err = m.newDs.Delete(key)
if err != nil {
return err
}
return nil
}
func (m multiBatch) Commit() error {
var err1, err2 error
err1 = m.oldDs.Commit()
err2 = m.newDs.Commit()
if err1 != nil || err2 != nil {
merr := multierror.Error{Errors: []error{err1, err2}}
return merr.ErrorOrNil()
}
return nil
}

View file

@ -0,0 +1,145 @@
package multids
import (
"context"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
"sync"
)
// newResultsCombiner returns combined results. order not guranteed. in race conds it can returns +1 entries
func newResultsCombiner(results ...query.Results) query.Results {
r := &resultsCombiner{results: results}
r.ctx, r.cancel = context.WithCancel(context.Background())
return r
}
type resultsCombiner struct {
ctx context.Context
cancel context.CancelFunc
results []query.Results
}
func (r resultsCombiner) Query() query.Query {
return r.results[0].Query()
}
func (r resultsCombiner) nextUnordered() <-chan query.Result {
out := make(chan query.Result)
var (
total int
m sync.Mutex
limitExhausted = make(chan struct{})
)
limit := r.results[0].Query().Limit
for _, results := range r.results {
go func(res query.Results) {
var more bool
var v query.Result
select {
case v, more = <-res.Next():
if !more {
break
}
m.Lock()
if total >= limit {
m.Unlock()
break
}
total++
if total+1 == limit {
close(limitExhausted)
}
m.Unlock()
out <- v
case <-r.ctx.Done():
break
case <-limitExhausted:
break
}
}(results)
}
return out
}
func (r resultsCombiner) Next() <-chan query.Result {
if len(r.results[0].Query().Orders) == 0 {
return r.nextUnordered()
}
out := make(chan query.Result)
entries, err := r.Rest()
if err != nil {
go func() {
out <- query.Result{
Entry: query.Entry{},
Error: err,
}
}()
return out
}
go func() {
for _, entry := range entries {
select {
case out <- query.Result{
Entry: entry,
}:
case <-r.ctx.Done():
}
}
}()
return out
}
func (r resultsCombiner) NextSync() (query.Result, bool) {
for _, res := range r.results {
v, more := res.NextSync()
if more {
return v, more
}
}
return query.Result{}, false
}
func (r resultsCombiner) Rest() ([]query.Entry, error) {
var entries = make([]query.Entry, 0)
for _, res := range r.results {
entriesPart, err := res.Rest()
if err != nil {
return nil, err
}
entries = append(entries, entriesPart...)
}
query.Sort(r.results[0].Query().Orders, entries)
if len(entries) > r.results[0].Query().Limit {
entries = entries[0:r.results[0].Query().Limit]
}
return entries, nil
}
func (r resultsCombiner) Close() error {
r.cancel()
var err error
merr := multierror.Error{}
for _, res := range r.results {
err = res.Close()
if err != nil {
merr.Errors = append(merr.Errors, err)
}
}
return merr.ErrorOrNil()
}
func (r resultsCombiner) Process() goprocess.Process {
// todo
return nil
}