Skip to content

Commit a8c3c8e

Browse files
committed
Support clustered purge
- add 2 APIs(set_purge_seq, get_purge_seq). - keep purge_seq in clouseau. - adjust the commit logic to update purge seq. - update the test code. BugzID: 68280
1 parent 900264f commit a8c3c8e

File tree

3 files changed

+46
-11
lines changed

3 files changed

+46
-11
lines changed

src/main/scala/com/cloudant/clouseau/ClouseauTypeFactory.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ case class UpdateDocMsg(id: String, doc: Document)
4141
case class DeleteDocMsg(id: String)
4242
case class CommitMsg(seq: Long)
4343
case class SetUpdateSeqMsg(seq: Long)
44+
case class SetPurgeSeqMsg(seq: Long)
4445

4546
object ClouseauTypeFactory extends TypeFactory {
4647

@@ -89,6 +90,8 @@ object ClouseauTypeFactory extends TypeFactory {
8990
Some(CommitMsg(toLong(reader.readTerm)))
9091
case ('set_update_seq, 2) =>
9192
Some(SetUpdateSeqMsg(toLong(reader.readTerm)))
93+
case ('set_purge_seq, 2) =>
94+
Some(SetPurgeSeqMsg(toLong(reader.readTerm)))
9295
case _ =>
9396
None
9497
}

src/main/scala/com/cloudant/clouseau/IndexService.scala

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
7070
var reader = DirectoryReader.open(ctx.args.writer, true)
7171
var updateSeq = getCommittedSeq
7272
var pendingSeq = updateSeq
73+
var purgeSeq = getCommittedPurgeSeq
74+
var pendingPurgeSeq = purgeSeq
7375
var committing = false
7476
var forceRefresh = false
7577

@@ -97,6 +99,8 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
9799
group2(request)
98100
case 'get_update_seq =>
99101
('ok, updateSeq)
102+
case 'get_purge_seq =>
103+
('ok, purgeSeq)
100104
case UpdateDocMsg(id: String, doc: Document) =>
101105
logger.debug("Updating %s".format(id))
102106
updateTimer.time {
@@ -117,6 +121,11 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
117121
pendingSeq = newSeq
118122
logger.debug("Pending sequence is now %d".format(newSeq))
119123
'ok
124+
case SetPurgeSeqMsg(newSeq: Long) =>
125+
pendingPurgeSeq = newSeq
126+
logger.debug("Pending purge sequence is now %d".format(newSeq))
127+
commit(pendingSeq, pendingPurgeSeq)
128+
'ok
120129
case 'info =>
121130
('ok, getInfo)
122131
}
@@ -148,12 +157,13 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
148157
}
149158
exit('deleted)
150159
case 'maybe_commit =>
151-
commit(pendingSeq)
152-
case ('committed, newSeq: Long) =>
153-
updateSeq = newSeq
160+
commit(pendingSeq, pendingPurgeSeq)
161+
case ('committed, newUpdateSeq: Long, newPurgeSeq: Long) =>
162+
updateSeq = newUpdateSeq
163+
purgeSeq = newPurgeSeq
154164
forceRefresh = true
155165
committing = false
156-
logger.info("Committed sequence %d".format(newSeq))
166+
logger.debug("Committed sequence %d and %d".format(newUpdateSeq, newPurgeSeq))
157167
case 'commit_failed =>
158168
committing = false
159169
}
@@ -175,18 +185,19 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
175185
}
176186
}
177187

178-
private def commit(newSeq: Long) {
179-
if (!committing && newSeq > updateSeq) {
188+
private def commit(newUpdateSeq: Long, newPurgeSeq: Long) {
189+
if (!committing && (newUpdateSeq > updateSeq || newPurgeSeq > purgeSeq)) {
180190
committing = true
181191
val index = self
182192
node.spawn((_) => {
183193
ctx.args.writer.setCommitData(ctx.args.writer.getCommitData +
184-
("update_seq" -> newSeq.toString))
194+
("update_seq" -> newUpdateSeq.toString) +
195+
("purge_seq" -> newPurgeSeq.toString))
185196
try {
186197
commitTimer.time {
187198
ctx.args.writer.commit()
188199
}
189-
index ! ('committed, newSeq)
200+
index ! ('committed, newUpdateSeq, newPurgeSeq)
190201
} catch {
191202
case e: AlreadyClosedException =>
192203
logger.error("Commit failed to closed writer", e)
@@ -569,7 +580,8 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
569580
('doc_count, reader.numDocs),
570581
('doc_del_count, reader.numDeletedDocs),
571582
('pending_seq, pendingSeq),
572-
('committed_seq, getCommittedSeq)
583+
('committed_seq, getCommittedSeq),
584+
('purge_seq, pendingPurgeSeq)
573585
)
574586
}
575587

@@ -590,6 +602,16 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
590602
}
591603
}
592604

605+
private def getCommittedPurgeSeq = {
606+
val commitData = ctx.args.writer.getCommitData
607+
commitData.get("purge_seq") match {
608+
case null =>
609+
0L
610+
case seq =>
611+
seq.toLong
612+
}
613+
}
614+
593615
private def parseSort(v: Any): Sort = v match {
594616
case 'relevance =>
595617
Sort.RELEVANCE

src/test/scala/com/cloudant/clouseau/IndexServiceSpec.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,15 +399,25 @@ class IndexServiceSpec extends SpecificationWithJUnit {
399399
node.call(service, UpdateDocMsg("bar", doc2)) must be equalTo 'ok
400400
node.call(service, UpdateDocMsg("zzz", doc3)) must be equalTo 'ok
401401

402-
node.call(service, Group1Msg("*:*", "_id", true,"<distance,lon,lat,0.2,57.15,km>", 0, 10)) must beLike {
402+
node.call(service, Group1Msg("*:*", "_id", true, "<distance,lon,lat,0.2,57.15,km>", 0, 10)) must beLike {
403403
case ('ok, List((foo, _), (zzz, _), (bar, _))) => ok
404404
}
405405

406-
node.call(service, Group1Msg("*:*", "_id", true,"<distance,lon,lat,12,57.15,km>", 0, 10)) must beLike {
406+
node.call(service, Group1Msg("*:*", "_id", true, "<distance,lon,lat,12,57.15,km>", 0, 10)) must beLike {
407407
case ('ok, List((bar, _), (zzz, _), (foo, _))) => ok
408408
}
409409
}
410410

411+
"support set/get purge seq" in new index_service {
412+
node.call(service, 'get_purge_seq) must be equalTo ('ok, 0)
413+
node.call(service, SetPurgeSeqMsg(1)) must be equalTo 'ok
414+
Thread.sleep(100)
415+
node.call(service, 'get_purge_seq) must be equalTo ('ok, 1)
416+
node.call(service, SetPurgeSeqMsg(2)) must be equalTo 'ok
417+
Thread.sleep(100)
418+
node.call(service, 'get_purge_seq) must be equalTo ('ok, 2)
419+
}
420+
411421
}
412422

413423
private def isSearchable(node: Node, service: Pid,

0 commit comments

Comments
 (0)