@@ -56,6 +56,7 @@ import scalang.Pid
56
56
import scalang .Reference
57
57
import com .spatial4j .core .context .SpatialContext
58
58
import com .spatial4j .core .distance .DistanceUtils
59
+ import java .util .HashSet
59
60
60
61
case class IndexServiceArgs (config : Configuration , name : String , queryParser : QueryParser , writer : IndexWriter )
61
62
case class HighlightParameters (highlighter : Highlighter , highlightFields : List [String ], highlightNumber : Int , analyzers : List [Analyzer ])
@@ -71,6 +72,7 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
71
72
var pendingSeq = updateSeq
72
73
var committing = false
73
74
var forceRefresh = false
75
+ var idle = true
74
76
75
77
val searchTimer = metrics.timer(" searches" )
76
78
val updateTimer = metrics.timer(" updates" )
@@ -80,45 +82,57 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
80
82
// Start committer heartbeat
81
83
val commitInterval = ctx.args.config.getInt(" commit_interval_secs" , 30 )
82
84
sendEvery(self, ' maybe_commit , commitInterval * 1000 )
85
+ val countFieldsEnabled = ctx.args.config.getBoolean(" clouseau.count_fields" , false )
86
+ send(self, ' count_fields )
87
+
88
+ // Check if the index is idle and optionally close it if there is no activity between
89
+ // Two consecutive idle status checks.
90
+ val closeIfIdleEnabled = ctx.args.config.getBoolean(" clouseau.close_if_idle" , false )
91
+ val idleTimeout = ctx.args.config.getInt(" clouseau.idle_check_interval_secs" , 300 )
92
+ if (closeIfIdleEnabled) {
93
+ sendEvery(self, ' close_if_idle , idleTimeout * 1000 )
94
+ }
83
95
84
96
debug(" Opened at update_seq %d" .format(updateSeq))
85
97
86
98
override def handleCall (tag : (Pid , Reference ), msg : Any ): Any = {
99
+ idle = false
87
100
send(' main , (' touch_lru , ctx.args.name))
88
-
89
- msg match {
90
- case request : SearchRequest =>
91
- search(request)
92
- case Group1Msg (query : String , field : String , refresh : Boolean , groupSort : Any , groupOffset : Int ,
93
- groupLimit : Int ) =>
94
- group1(query, field, refresh, groupSort, groupOffset, groupLimit)
95
- case request : Group2Msg =>
96
- group2(request)
97
- case ' get_update_seq =>
98
- (' ok , updateSeq)
99
- case UpdateDocMsg (id : String , doc : Document ) =>
100
- debug(" Updating %s" .format(id))
101
- updateTimer.time {
102
- ctx.args.writer.updateDocument(new Term (" _id" , id), doc)
103
- }
104
- ' ok
105
- case DeleteDocMsg (id : String ) =>
106
- debug(" Deleting %s" .format(id))
107
- deleteTimer.time {
108
- ctx.args.writer.deleteDocuments(new Term (" _id" , id))
109
- }
110
- ' ok
111
- case CommitMsg (commitSeq : Long ) => // deprecated
112
- pendingSeq = commitSeq
113
- debug(" Pending sequence is now %d" .format(commitSeq))
114
- ' ok
115
- case SetUpdateSeqMsg (newSeq : Long ) =>
116
- pendingSeq = newSeq
117
- debug(" Pending sequence is now %d" .format(newSeq))
118
- ' ok
119
- case ' info =>
120
- (' ok , getInfo)
121
- }
101
+ internalHandleCall(tag, msg)
102
+ }
103
+
104
+ def internalHandleCall (tag : (Pid , Reference ), msg : Any ): Any = msg match {
105
+ case request : SearchRequest =>
106
+ search(request)
107
+ case Group1Msg (query : String , field : String , refresh : Boolean , groupSort : Any , groupOffset : Int ,
108
+ groupLimit : Int ) =>
109
+ group1(query, field, refresh, groupSort, groupOffset, groupLimit)
110
+ case request : Group2Msg =>
111
+ group2(request)
112
+ case ' get_update_seq =>
113
+ (' ok , updateSeq)
114
+ case UpdateDocMsg (id : String , doc : Document ) =>
115
+ debug(" Updating %s" .format(id))
116
+ updateTimer.time {
117
+ ctx.args.writer.updateDocument(new Term (" _id" , id), doc)
118
+ }
119
+ ' ok
120
+ case DeleteDocMsg (id : String ) =>
121
+ debug(" Deleting %s" .format(id))
122
+ deleteTimer.time {
123
+ ctx.args.writer.deleteDocuments(new Term (" _id" , id))
124
+ }
125
+ ' ok
126
+ case CommitMsg (commitSeq : Long ) => // deprecated
127
+ pendingSeq = commitSeq
128
+ debug(" Pending sequence is now %d" .format(commitSeq))
129
+ ' ok
130
+ case SetUpdateSeqMsg (newSeq : Long ) =>
131
+ pendingSeq = newSeq
132
+ debug(" Pending sequence is now %d" .format(newSeq))
133
+ ' ok
134
+ case ' info =>
135
+ (' ok , getInfo)
122
136
}
123
137
124
138
override def handleCast (msg : Any ) = msg match {
@@ -139,6 +153,13 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
139
153
exit(msg)
140
154
case (' close , reason) =>
141
155
exit(reason)
156
+ case (' close_if_idle ) =>
157
+ if (idle) {
158
+ exit(" Idle Timeout" )
159
+ }
160
+ idle = true
161
+ case ' count_fields =>
162
+ countFields
142
163
case ' delete =>
143
164
val dir = ctx.args.writer.getDirectory
144
165
ctx.args.writer.close()
@@ -157,6 +178,25 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
157
178
committing = false
158
179
}
159
180
181
+ def countFields () {
182
+ if (countFieldsEnabled) {
183
+ val leaves = reader.leaves().iterator()
184
+ val warningThreshold = ctx.args.config.
185
+ getInt(" clouseau.field_count_warn_threshold" , 5000 )
186
+ val fields = new HashSet [String ]()
187
+ while (leaves.hasNext() && fields.size <= warningThreshold) {
188
+ val fieldInfoIter = leaves.next.reader().getFieldInfos().iterator()
189
+ while (fieldInfoIter.hasNext() && fields.size <= warningThreshold) {
190
+ fields.add(fieldInfoIter.next().name)
191
+ }
192
+ }
193
+ if (fields.size > warningThreshold) {
194
+ warn(" Index has more than %d fields, " .format(warningThreshold) +
195
+ " too many fields will lead to heap exhuastion" )
196
+ }
197
+ }
198
+ }
199
+
160
200
override def exit (msg : Any ) {
161
201
debug(" Closed with reason: %.1000s" .format(msg))
162
202
try {
0 commit comments