Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
*
* @author Mahmoud Ben Hassine
* @author Elimelec Burghelea
* @author Yanming Zhou
* @param <T> type of objects to read
* @since 5.2
*/
public class CompositeItemReader<T> implements ItemStreamReader<T> {

private final List<ItemStreamReader<? extends T>> delegates;

private final Iterator<ItemStreamReader<? extends T>> delegatesIterator;
private @Nullable Iterator<ItemStreamReader<? extends T>> delegatesIterator;

private @Nullable ItemStreamReader<? extends T> currentDelegate;

Expand All @@ -48,27 +49,29 @@ public class CompositeItemReader<T> implements ItemStreamReader<T> {
*/
public CompositeItemReader(List<ItemStreamReader<? extends T>> delegates) {
this.delegates = delegates;
this.delegatesIterator = this.delegates.iterator();
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
}

// TODO: check if we need to open/close delegates on the fly in read() to avoid
// opening resources early for a long time
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStreamReader<? extends T> delegate : delegates) {

this.delegatesIterator = this.delegates.iterator();
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;

for (ItemStreamReader<? extends T> delegate : this.delegates) {
delegate.open(executionContext);
}
}

@Override
public @Nullable T read() throws Exception {
if (this.currentDelegate == null) {
if (this.currentDelegate == null || this.delegatesIterator == null) {
return null;
}
T item = currentDelegate.read();
T item = this.currentDelegate.read();
if (item == null) {
currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
return read();
}
return item;
Expand All @@ -89,9 +92,13 @@ public void update(ExecutionContext executionContext) throws ItemStreamException
*/
@Override
public void close() throws ItemStreamException {

this.delegatesIterator = null;
this.currentDelegate = null;

List<Exception> exceptions = new ArrayList<>();

for (ItemStreamReader<? extends T> delegate : delegates) {
for (ItemStreamReader<? extends T> delegate : this.delegates) {
try {
delegate.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@
import org.springframework.batch.infrastructure.item.ExecutionContext;
import org.springframework.batch.infrastructure.item.ItemStreamException;
import org.springframework.batch.infrastructure.item.ItemStreamReader;
import org.springframework.batch.infrastructure.item.support.CompositeItemReader;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

/**
* Test class for {@link CompositeItemReader}.
*
* @author Mahmoud Ben Hassine
* @author Elimelec Burghelea
* @author Yanming Zhou
*/
public class CompositeItemReaderTests {

Expand All @@ -62,6 +63,7 @@ void testCompositeItemReaderRead() throws Exception {
ItemStreamReader<String> reader1 = mock();
ItemStreamReader<String> reader2 = mock();
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
compositeItemReader.open(new ExecutionContext());
when(reader1.read()).thenReturn("foo1", "foo2", null);
when(reader2.read()).thenReturn("bar1", "bar2", null);

Expand All @@ -88,13 +90,15 @@ void testCompositeItemReaderUpdate() {
ItemStreamReader<String> reader2 = mock();
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
ExecutionContext executionContext = new ExecutionContext();
compositeItemReader.open(executionContext);

// when
compositeItemReader.update(executionContext);

// then
verify(reader1).update(executionContext);
verifyNoInteractions(reader2); // reader1 is the current delegate in this setup
verify(reader2, times(0)).update(executionContext); // reader1 is the current
// delegate in this setup
}

@Test
Expand All @@ -103,6 +107,7 @@ void testCompositeItemReaderClose() {
ItemStreamReader<String> reader1 = mock();
ItemStreamReader<String> reader2 = mock();
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
compositeItemReader.open(new ExecutionContext());

// when
compositeItemReader.close();
Expand All @@ -118,6 +123,7 @@ void testCompositeItemReaderCloseWithDelegateThatThrowsException() {
ItemStreamReader<String> reader1 = mock();
ItemStreamReader<String> reader2 = mock();
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
compositeItemReader.open(new ExecutionContext());

doThrow(new ItemStreamException("A failure")).when(reader1).close();

Expand All @@ -135,4 +141,62 @@ void testCompositeItemReaderCloseWithDelegateThatThrowsException() {
verify(reader2).close();
}

@Test
void testCompositeItemReaderRepeatableRead() throws Exception {
// given
ItemStreamReader<String> reader1 = new ItemStreamReader<>() {
int counter = 0;

@Override
public String read() {
return switch (this.counter++) {
case 0 -> "a";
case 1 -> "b";
default -> null;
};
}

@Override
public void close() {
this.counter = 0;
}
};
ItemStreamReader<String> reader2 = new ItemStreamReader<>() {
int counter = 0;

@Override
public String read() {
return switch (this.counter++) {
case 0 -> "c";
case 1 -> "d";
default -> null;
};
}

@Override
public void close() {
this.counter = 0;
}
};
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));

for (int i = 0; i < 5; i++) {
verifyRead(compositeItemReader);
}
}

private void verifyRead(CompositeItemReader<String> compositeItemReader) throws Exception {
// when
compositeItemReader.open(new ExecutionContext());

// then
assertEquals("a", compositeItemReader.read());
assertEquals("b", compositeItemReader.read());
assertEquals("c", compositeItemReader.read());
assertEquals("d", compositeItemReader.read());
assertNull(compositeItemReader.read());

compositeItemReader.close();
}

}