Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions base/collection_xattr_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,14 @@ func UpdateTombstoneXattr(store SubdocXattrStore, k string, xattrKey string, exp
return cas, err
}

// WriteUpdateWithXattr retrieves the existing doc from the bucket, invokes the callback to update the document, then writes the new document to the bucket. Will repeat this process on cas
// failure. If previousValue/xattr/cas are provided, will use those on the first iteration instead of retrieving from the bucket.
// WriteUpdateWithXattr retrieves the existing doc from the bucket, invokes the callback to update
// the document, then writes the new document to the bucket. Will repeat this process on cas
// failure.
//
// If previous document is provided, will use it on 1st iteration instead of retrieving from bucket.
// A zero CAS in `previous` is interpreted as no document existing; this can be used to short-
// circuit the initial Get when the document is unlikely to already exist.

func WriteUpdateWithXattr(store SubdocXattrStore, k string, xattrKey string, userXattrKey string, exp uint32, opts *sgbucket.MutateInOptions, previous *sgbucket.BucketDocument, callback sgbucket.WriteUpdateWithXattrFunc) (casOut uint64, err error) {

var value []byte
Expand All @@ -192,19 +198,21 @@ func WriteUpdateWithXattr(store SubdocXattrStore, k string, xattrKey string, use
var cas uint64
emptyCas := uint64(0)

// If an existing value has been provided, use that as the initial value
if previous != nil && previous.Cas > 0 {
value = previous.Body
xattrValue = previous.Xattr
cas = previous.Cas
userXattrValue = previous.UserXattr
}

for {
var err error
// If no existing value has been provided, retrieve the current value from the bucket
if cas == 0 {
// Load the existing value.
if previous != nil {
// If an existing value has been provided, use that as the initial value.
// A zero CAS is interpreted as no document existing.
if previous.Cas != 0 {
value = previous.Body
xattrValue = previous.Xattr
cas = previous.Cas
userXattrValue = previous.UserXattr
}
previous = nil // a retry will get value from bucket, as below
} else {
// If no existing value has been provided, or on a retry,
// retrieve the current value from the bucket
cas, err = store.SubdocGetBodyAndXattr(k, xattrKey, userXattrKey, &value, &xattrValue, &userXattrValue)

if err != nil {
Expand All @@ -224,7 +232,6 @@ func WriteUpdateWithXattr(store SubdocXattrStore, k string, xattrKey string, use

// If it's an ErrCasFailureShouldRetry, then retry by going back through the for loop
if err == ErrCasFailureShouldRetry {
cas = 0 // force the call to SubdocGetBodyAndXattr() to refresh
continue
}

Expand Down Expand Up @@ -258,7 +265,6 @@ func WriteUpdateWithXattr(store SubdocXattrStore, k string, xattrKey string, use
value = nil
xattrValue = nil
cas = 0

}
}

Expand Down
42 changes: 36 additions & 6 deletions db/blip_collection_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (

// blipSyncCollectionContext stores information about a single collection for a BlipSyncContext
type blipSyncCollectionContext struct {
dbCollection *DatabaseCollection
activeSubChanges base.AtomicBool // Flag for whether there is a subChanges subscription currently active. Atomic access
changesCtxLock sync.Mutex
changesCtx context.Context // Used for the unsub changes Blip message to check if the subChanges feed should stop
changesCtxCancel context.CancelFunc // Cancel function for changesCtx to cancel subChanges being sent
dbCollection *DatabaseCollection
activeSubChanges base.AtomicBool // Flag for whether there is a subChanges subscription currently active. Atomic access
changesCtxLock sync.Mutex
changesCtx context.Context // Used for the unsub changes Blip message to check if the subChanges feed should stop
changesCtxCancel context.CancelFunc // Cancel function for changesCtx to cancel subChanges being sent
pendingInsertionsLock sync.Mutex
pendingInsertions base.Set // DocIDs from handleProposeChanges that aren't in the db

sgr2PullAddExpectedSeqsCallback func(expectedSeqs map[IDAndRev]SequenceID) // sgr2PullAddExpectedSeqsCallback is called after successfully handling an incoming changes message
sgr2PullProcessedSeqCallback func(remoteSeq *SequenceID, idAndRev IDAndRev) // sgr2PullProcessedSeqCallback is called after successfully handling an incoming rev message
Expand All @@ -41,15 +43,43 @@ type blipCollections struct {
sync.RWMutex
}

// Max number of docIDs to keep in pendingInsertions. (Normally items added to this set are
// removed soon thereafter when the client sends the `rev` message; this limit is just to cover
// failure cases where a client never sends the revs, to keep the set from growing w/o bound.)
const kMaxPendingInsertions = 1000

// newBlipSyncCollection constructs a context to hold all blip data for a given collection.
func newBlipSyncCollectionContext(dbCollection *DatabaseCollection) *blipSyncCollectionContext {
c := &blipSyncCollectionContext{
dbCollection: dbCollection,
dbCollection: dbCollection,
pendingInsertions: base.Set{},
}
c.changesCtx, c.changesCtxCancel = context.WithCancel(context.Background())
return c
}

// Remembers a docID that doesn't exist in the collection at the time handleProposeChanges ran.
func (bsc *blipSyncCollectionContext) notePendingInsertion(docID string) {
bsc.pendingInsertionsLock.Lock()
defer bsc.pendingInsertionsLock.Unlock()
if len(bsc.pendingInsertions) < kMaxPendingInsertions {
bsc.pendingInsertions.Add(docID)
} else {
base.WarnfCtx(bsc.changesCtx, "Sync client has more than %d pending doc insertions in collection %q", kMaxPendingInsertions, base.UD(bsc.dbCollection.Name))
}
}

// True if this docID was known not to exist in the collection when handleProposeChanges ran.
// (If so, this fn also forgets the docID, so any subsequent call will return false.)
func (bsc *blipSyncCollectionContext) checkPendingInsertion(docID string) (found bool) {
bsc.pendingInsertionsLock.Lock()
defer bsc.pendingInsertionsLock.Unlock()
if found = bsc.pendingInsertions.Contains(docID); found {
delete(bsc.pendingInsertions, docID)
}
return
}

// setNonCollectionAware adds a single collection matching _default._default collection, to be refered to if no Collection property is set on a blip message.
func (b *blipCollections) setNonCollectionAware(collectionCtx *blipSyncCollectionContext) {
b.Lock()
Expand Down
68 changes: 30 additions & 38 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type blipHandler struct {
*BlipSyncContext
db *Database // Handler-specific copy of the BlipSyncContext's blipContextDb
collection *DatabaseCollectionWithUser // Handler-specific copy of the BlipSyncContext's collection specific DB
collectionCtx *blipSyncCollectionContext // Sync-specific data for this collection
collectionIdx *int // index into BlipSyncContext.collectionMapping for the collection
loggingCtx context.Context // inherited from BlipSyncContext.loggingCtx with additional handler-only information (like keyspace)
serialNumber uint64 // This blip handler's serial number to differentiate logs w/ other handlers
Expand Down Expand Up @@ -152,7 +153,11 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
if err != nil {
return err
}
bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.collection.DatabaseCollection))
bh.collectionCtx, err = bh.collections.get(nil)
if err != nil {
bh.collections.setNonCollectionAware(newBlipSyncCollectionContext(bh.collection.DatabaseCollection))
bh.collectionCtx, _ = bh.collections.get(nil)
}
return next(bh, bm)
}
if !bh.collections.hasNamedCollections() {
Expand All @@ -165,12 +170,12 @@ func collectionBlipHandler(next blipHandlerFunc) blipHandlerFunc {
}

bh.collectionIdx = &collectionIndex
collectionCtx, err := bh.collections.get(&collectionIndex)
bh.collectionCtx, err = bh.collections.get(&collectionIndex)
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err))
}
bh.collection = &DatabaseCollectionWithUser{
DatabaseCollection: collectionCtx.dbCollection,
DatabaseCollection: bh.collectionCtx.dbCollection,
user: bh.db.user,
}
bh.loggingCtx = base.CollectionLogCtx(bh.BlipSyncContext.loggingCtx, bh.collection.Name)
Expand Down Expand Up @@ -246,10 +251,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

// Ensure that only _one_ subChanges subscription can be open on this blip connection at any given time. SG #3222.
collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, fmt.Sprintf("%s", err))
}
collectionCtx := bh.collectionCtx
collectionCtx.changesCtxLock.Lock()
defer collectionCtx.changesCtxLock.Unlock()
if !collectionCtx.activeSubChanges.CASRetry(false, true) {
Expand Down Expand Up @@ -331,10 +333,7 @@ func (bh *blipHandler) handleSubChanges(rq *blip.Message) error {
}

func (bh *blipHandler) handleUnsubChanges(rq *blip.Message) error {
collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}
collectionCtx := bh.collectionCtx
collectionCtx.changesCtxLock.Lock()
defer collectionCtx.changesCtxLock.Unlock()
collectionCtx.changesCtxCancel()
Expand Down Expand Up @@ -603,11 +602,7 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

collectionCtx := bh.collectionCtx
bh.logEndpointEntry(rq.Profile(), fmt.Sprintf("#Changes:%d", len(changeList)))
if len(changeList) == 0 {
// An empty changeList is sent when a one-shot replication sends its final changes
Expand Down Expand Up @@ -766,7 +761,11 @@ func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {
parentRevID = change[2].(string)
}
status, currentRev := bh.collection.CheckProposedRev(bh.loggingCtx, docID, revID, parentRevID)
if status != 0 {
if status == ProposedRev_OK_IsNew {
// Remember that the doc doesn't exist locally, in order to optimize the upcoming Put:
bh.collectionCtx.notePendingInsertion(docID)
} else if status != ProposedRev_OK {
// Reject the proposed change.
// Skip writing trailing zeroes; but if we write a number afterwards we have to catch up
if nWritten > 0 {
output.Write([]byte(","))
Expand Down Expand Up @@ -860,17 +859,12 @@ func (bh *blipHandler) handleNoRev(rq *blip.Message) error {
base.InfofCtx(bh.loggingCtx, base.KeySyncMsg, "%s: norev for doc %q / %q seq:%q - error: %q - reason: %q",
rq.String(), base.UD(docID), revID, seqStr, rq.Properties[NorevMessageError], rq.Properties[NorevMessageReason])

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seq, err := ParseJSONSequenceID(seqStr)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from norev message: %v - not tracking for checkpointing", seqStr, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}

Expand Down Expand Up @@ -942,19 +936,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

stats.docsPurgedCount.Add(1)
if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seqStr := rq.Properties[RevMessageSequence]
seq, err := ParseJSONSequenceID(seqStr)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqStr, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}
return nil
Expand Down Expand Up @@ -1149,6 +1138,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
newDoc.UpdateBody(body)
}

if rawBucketDoc == nil && bh.collectionCtx.checkPendingInsertion(docID) {
// At the time we handled the `propseChanges` request, there was no doc with this docID
// in the bucket. As an optimization, tell PutExistingRev to assume the doc still doesn't
// exist and bypass getting it from the bucket during the save. If we're wrong, the save
// will fail with a CAS mismatch and the retry will fetch the existing doc.
rawBucketDoc = &sgbucket.BucketDocument{} // empty struct with zero CAS
}

// Finally, save the revision (with the new attachments inline)
// If a conflict resolver is defined for the handler, write with conflict resolution.

Expand All @@ -1164,18 +1161,13 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
return err
}

collectionCtx, err := bh.collections.get(bh.collectionIdx)
if err != nil {
return err
}

if collectionCtx.sgr2PullProcessedSeqCallback != nil {
if bh.collectionCtx.sgr2PullProcessedSeqCallback != nil {
seqProperty := rq.Properties[RevMessageSequence]
seq, err := ParseJSONSequenceID(seqProperty)
if err != nil {
base.WarnfCtx(bh.loggingCtx, "Unable to parse sequence %q from rev message: %v - not tracking for checkpointing", seqProperty, err)
} else {
collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
bh.collectionCtx.sgr2PullProcessedSeqCallback(&seq, IDAndRev{DocID: docID, RevID: revID})
}
}

Expand Down
Loading