Skip to content

Sequential Aggregate Reader #4777

Open
Open
@mcherb

Description

@mcherb

I have written once a reader that I want to share (I can create a PR if you like it):

The idea of this reader is to read an ordered set of data and to group items based on a condition. Exemple :

Given :

fk data
1 data-1
1 data-2
2 data-1
2 data-2
3 data-1

Based on the fk column, the reader must give me :

Aggregate : (1, {data-1, data-2})
Aggregate : (2, {data-1, data-2})
Aggregate : (3, {data-1})

Implementation (Not thread safe) :

//Implemented by the model to tell us how to aggregate, which line match the next one
public interface AggregateMatcher<T> {
    boolean match(T t);
}
//The aggregate model
public class Aggregate<T> implements Iterable<T> {

    private final List<T> items = new ArrayList<>();

    public static <E> Aggregate<E> of(E item) {
        return new Aggregate<>(item);
    }

    @SafeVarargs
    public static <E> Aggregate<E> of(E... item) {
        Aggregate<E> aggregate = new Aggregate<>();
        Arrays.stream(item).forEach(aggregate::add);
        return aggregate;
    }
    
    private Aggregate() {
    }

    private Aggregate(T first) {
        add(first);
    }

    public Aggregate<T> add(T item) {
        items.add(item);
        return this;
    }

    public boolean isEmpty() {
        return items.isEmpty();
    }

    @Override
    public Iterator<T> iterator() {
        return items.iterator();
    }

    public Stream<T> stream() {
        return items.stream();
    }

    public int size() {
        return items.size();
    }
}
public class AggregateReader<T extends AggregateMatcher<T>> implements ItemReader<Aggregate<T>>, ItemStream {

    private final PeekableItemReader<T> delegate;

    public AggregateReader(PeekableItemReader<T> delegate) {
        this.delegate = delegate;
    }

    @Nullable
    @Override
    public Aggregate<T> read() throws Exception {

        T item = delegate.read();
        if (item == null) {
            return null;
        }

        Aggregate<T> aggregate = Aggregate.of(item);
        T candidate;
        while ((candidate = delegate.peek()) != null && candidate.match(item)) {
            aggregate.add(delegate.read());
        }

        return aggregate;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).open(executionContext);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).update(executionContext);
        }
    }

    @Override
    public void close() throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).close();
        }
    }
}

And finally the test class:

class AggregateReaderTest {

    @Test
    void should_read_multiple_aggregates_when_multiple_ides_found() throws Exception {

        AggregateReader<SomeData> reader = aggregateReader(List.of(
                SomeData.of(1, "1"),
                SomeData.of(1, "2"),
                SomeData.of(2, "1"),
                SomeData.of(2, "2"),
                SomeData.of(2, "3"),
                SomeData.of(2, "4"),
                SomeData.of(3, "1"),
                SomeData.of(3, "2")
        ));

        assertThat(reader.read())
                .containsExactly(
                        SomeData.of(1, "1"),
                        SomeData.of(1, "2")
                );
        assertThat(reader.read())
                .containsExactly(
                        SomeData.of(2, "1"),
                        SomeData.of(2, "2"),
                        SomeData.of(2, "3"),
                        SomeData.of(2, "4")
                );
        assertThat(reader.read())
                .containsExactly(
                        SomeData.of(3, "1"),
                        SomeData.of(3, "2")
                );

        assertThat(reader.read()).isNull();
    }

    @Test
    void should_read_one_aggregate_of_one_item_when_data_size_isOne() throws Exception {

        AggregateReader<SomeData> reader = aggregateReader(List.of(SomeData.of(1, "1")));
        assertThat(reader.read())
                .containsExactly(SomeData.of(1, "1"));
        assertThat(reader.read()).isNull();
    }

    @Test
    void should_read_one_aggregate_when_same_id_on_all_data() throws Exception {

        AggregateReader<SomeData> reader = aggregateReader(List.of(SomeData.of(1, "1"), SomeData.of(1, "2")));
        assertThat(reader.read())
                .containsExactly(SomeData.of(1, "1"), SomeData.of(1, "2"));
        assertThat(reader.read()).isNull();
    }

    @Test
    void should_read_aggregate_of_one_item_when_not_same_id_provided() throws Exception {

        AggregateReader<SomeData> reader = aggregateReader(List.of(SomeData.of(1, "1"), SomeData.of(2, "2"), SomeData.of(3, "3")));
        assertThat(reader.read())
                .containsExactly(SomeData.of(1, "1"));
        assertThat(reader.read())
                .containsExactly(SomeData.of(2, "2"));
        assertThat(reader.read())
                .containsExactly(SomeData.of(3, "3"));
        assertThat(reader.read()).isNull();
    }

    @Test
    void should_read_null_when_empty_data() throws Exception {

        AggregateReader<SomeData> reader = aggregateReader(List.of());
        assertThat(reader.read()).isNull();
    }

    private static AggregateReader<SomeData> aggregateReader(List<SomeData> someData) {
        SingleItemPeekableItemReader<SomeData> peekableItemReader = new SingleItemPeekableItemReader<>();
        peekableItemReader.setDelegate(new ListItemReader<>(someData));
        return new AggregateReader<>(peekableItemReader);
    }

    @Value(staticConstructor = "of")
    private static class SomeData implements AggregateMatcher<SomeData> {
        int id;
        String data;

        @Override
        public boolean match(SomeData someData) {
            return id == someData.id;
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions