Skip to content

Commit f097569

Browse files
committed
Bounding spouts
1 parent e97eac7 commit f097569

File tree

2 files changed

+9
-8
lines changed
  • components/event-processor

2 files changed

+9
-8
lines changed

components/event-processor/org.wso2.carbon.event.processor.common/src/main/java/org/wso2/carbon/event/processor/common/storm/component/EventReceiverSpout.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import java.util.Arrays;
4242
import java.util.List;
4343
import java.util.Map;
44-
import java.util.concurrent.ConcurrentLinkedQueue;
44+
import java.util.concurrent.LinkedBlockingQueue;
4545

4646
/**
4747
* Receive events from CEP receivers through thrift receiver and pass through
@@ -72,8 +72,7 @@ public class EventReceiverSpout extends BaseRichSpout implements StreamCallback
7272
* this is filled by the receiver thread of data bridge and consumed by the nextTuple which
7373
* runs on the worker thread of spout.
7474
*/
75-
// TODO : Make this queue a fixed size to prevent out of memory issues
76-
private transient ConcurrentLinkedQueue<Event> storedEvents = null;
75+
private transient LinkedBlockingQueue<Event> storedEvents = null;
7776

7877
private SpoutOutputCollector spoutOutputCollector = null;
7978

@@ -125,7 +124,7 @@ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
125124
@Override
126125
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
127126
this.spoutOutputCollector = spoutOutputCollector;
128-
this.storedEvents = new ConcurrentLinkedQueue<Event>();
127+
this.storedEvents = new LinkedBlockingQueue<Event>(stormDeploymentConfig.getStormSpoutBufferSize());
129128

130129
inputThroughputProbe = new ThroughputProbe(logPrefix + "-IN", 10);
131130
outputThroughputProbe = new ThroughputProbe(logPrefix + " -OUT", 10);
@@ -137,7 +136,6 @@ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector
137136
thisHostIp = Utils.findAddress("localhost");
138137
listeningPort = findPort(thisHostIp);
139138
TCPEventServerConfig configs = new TCPEventServerConfig(thisHostIp, listeningPort);
140-
configs.setNumberOfThreads(stormDeploymentConfig.getTransportReceiverThreads());
141139
tcpEventServer = new TCPEventServer(configs, this, null);
142140
for (StreamDefinition siddhiStreamDefinition : incomingStreamDefinitions) {
143141
tcpEventServer.addStreamDefinition(siddhiStreamDefinition);
@@ -187,8 +185,12 @@ public void receive(String streamId, long timestamp, Object[] eventData) {
187185
if (log.isDebugEnabled()) {
188186
log.debug(logPrefix + "Received Event: " + streamId + ":" + Arrays.deepToString(eventData) + "@" + timestamp);
189187
}
190-
storedEvents.add(new Event(timestamp, eventData, streamId));
191-
inputThroughputProbe.update();
188+
try {
189+
storedEvents.put(new Event(timestamp, eventData, streamId));
190+
inputThroughputProbe.update();
191+
} catch (InterruptedException e) {
192+
//ignore
193+
}
192194
}
193195

194196

components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/SiddhiStormOutputEventListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private void init() {
8181
thisHostIp = HostAddressFinder.findAddress("localhost");
8282
listeningPort = findPort(thisHostIp);
8383
TCPEventServerConfig configs = new TCPEventServerConfig(thisHostIp, listeningPort);
84-
configs.setNumberOfThreads(stormDeploymentConfig.getTransportReceiverThreads());
8584
tcpEventServer = new TCPEventServer(configs, this, connectionCallback);
8685
tcpEventServer.start();
8786
executorService.execute(new Registrar());

0 commit comments

Comments
 (0)