Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Breaking up Integration tests #66

Merged
merged 8 commits into from
Nov 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [PR-37](https://github.com/salesforce/storm-dynamic-spout/pull/37) Replaced `SidelineRequestIdentifier` with `FilterChainStepIdentifier` in the FilterChain.
- [PR-37](https://github.com/salesforce/storm-dynamic-spout/pull/37) Added isSidelineStarted() and isSidelineStopped() to the `SidelineController`
- [PR-52](https://github.com/salesforce/storm-dynamic-spout/pull/52) Remove Kafka-Test-Server. Replace with Kafka-JUnit external dependency.
- [PR-66](https://github.com/salesforce/storm-dynamic-spout/pull/66) DynamicSpout.open() now throws IllegalStateException if you attempt to open it more than once.

### Improvements
- [PR-38](https://github.com/salesforce/storm-dynamic-spout/pull/38) Removed unused method `Deserializer.getOutputFields()`.
Expand Down
29 changes: 20 additions & 9 deletions src/main/java/com/salesforce/storm/spout/dynamic/DynamicSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ public DynamicSpout(Map<String, Object> spoutConfig) {
* @param topologyConfig The Storm Topology configuration.
* @param topologyContext The Storm Topology context.
* @param spoutOutputCollector The output collector to emit tuples via.
* @throws IllegalStateException if you attempt to open the spout multiple times.
*/
@Override
public void open(Map topologyConfig, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
if (isOpen) {
logger.warn("This spout has already been opened, cowardly refusing to open it again!");
return;
throw new IllegalStateException("This spout has already been opened.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this is ok, but this is a noteworthy change.

}

// Save references.
Expand Down Expand Up @@ -317,9 +317,9 @@ public void close() {
metricsRecorder = null;
}

if (spoutHandler != null) {
spoutHandler.onSpoutClose(this);
spoutHandler.close();
if (getSpoutHandler() != null) {
getSpoutHandler().onSpoutClose(this);
getSpoutHandler().close();
spoutHandler = null;
}

Expand All @@ -332,8 +332,8 @@ public void close() {
@Override
public void activate() {
logger.debug("Activating spout");
if (spoutHandler != null) {
spoutHandler.onSpoutActivate(this);
if (getSpoutHandler() != null) {
getSpoutHandler().onSpoutActivate(this);
}
}

Expand All @@ -343,8 +343,8 @@ public void activate() {
@Override
public void deactivate() {
logger.debug("Deactivate spout");
if (spoutHandler != null) {
spoutHandler.onSpoutDeactivate(this);
if (getSpoutHandler() != null) {
getSpoutHandler().onSpoutDeactivate(this);
}
}

Expand Down Expand Up @@ -405,6 +405,9 @@ private TopologyContext getTopologyContext() {
/**
* Add a new VirtualSpout to the coordinator, this will get picked up by the coordinator's monitor, opened and
* managed with teh other currently running spouts.
*
* This method is blocking.
*
* @param spout New delegate spout
* @throws SpoutAlreadyExistsException if a spout already exists with the same VirtualSpoutIdentifier.
*/
Expand Down Expand Up @@ -489,6 +492,14 @@ SpoutMessageBus getMessageBus() {
return messageBus;
}

/**
* @return The SpoutHandler implementation.
*/
SpoutHandler getSpoutHandler() {
checkSpoutOpened();
return spoutHandler;
}

/**
* @return The stream that tuples will be emitted out.
*/
Expand Down
Loading