-
Notifications
You must be signed in to change notification settings - Fork 3
/
driver_mongo.go
81 lines (71 loc) · 2.62 KB
/
driver_mongo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package siid
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
// upsert can not with different values for update and insert
// https://jira.mongodb.org/browse/SERVER-453
// https://jira.mongodb.org/browse/SERVER-991
// https://docs.mongodb.com/manual/reference/operator/update/setOnInsert/#up._S_setOnInsert
// setOnInsert cannot perform on same filed
type mongoDriver struct {
dbName string
collectionName string
client *mongo.Client
copts *options.CollectionOptions
}
func NewMongoDriver(client *mongo.Client) Driver {
return NewMongoDriverWithName(client, defaultName, defaultName)
}
func NewMongoDriverWithName(client *mongo.Client, dbName, collectionName string) Driver {
return &mongoDriver{client: client, dbName: dbName, collectionName: collectionName}
}
func (m *mongoDriver) Prepare(ctx context.Context) error { return m.pingPrimary(ctx) }
func (m *mongoDriver) Ping(ctx context.Context) error { return m.pingPrimary(ctx) }
func (m *mongoDriver) pingPrimary(ctx context.Context) error {
var cancel context.CancelFunc
ctx, cancel = wrapperContext(ctx)
err := m.client.Ping(ctx, readpref.Primary())
cancel()
return err
}
func (m *mongoDriver) Destroy(ctx context.Context) error {
var cancel context.CancelFunc
ctx, cancel = wrapperContext(ctx)
err := m.client.Disconnect(ctx)
cancel()
return err
}
func (m *mongoDriver) getCollection() *mongo.Collection {
return m.client.Database(m.dbName).Collection(m.collectionName, m.copts)
}
func (m *mongoDriver) Renew(ctx context.Context, domain string, quantum, offset uint64) (uint64, error) {
var cancel context.CancelFunc
ctx, cancel = wrapperContext(ctx)
curr, err := m.renew(ctx, domain, quantum)
if err == errDomainLost { // create new domain
// do not care fail
_, _ = m.getCollection().InsertOne(ctx, bson.D{{Key: "_id", Value: domain}, {Key: "current", Value: offset}})
curr, err = m.renew(ctx, domain, quantum)
}
cancel()
return curr, err
}
func (m *mongoDriver) renew(ctx context.Context, domain string, quantum uint64) (uint64, error) {
filter := bson.D{{Key: "_id", Value: domain}}
update := bson.D{{Key: "$inc", Value: bson.D{{Key: "current", Value: quantum}}}}
var opts options.FindOneAndUpdateOptions
opts.SetUpsert(false).SetReturnDocument(options.Before)
var doc struct{ Current uint64 }
err := m.getCollection().FindOneAndUpdate(ctx, filter, update, &opts).Decode(&doc)
if err != nil {
if err == mongo.ErrNoDocuments {
return 0, errDomainLost
}
return 0, err
}
return doc.Current, nil
}