diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..03220c5 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..7c65633 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,13 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..be0fb2c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/scala_compiler.xml b/.idea/scala_compiler.xml new file mode 100644 index 0000000..d84cc6a --- /dev/null +++ b/.idea/scala_compiler.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..e96534f --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index aca358d..8416304 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,18 @@ 1.8.1 + + org.apache.commons + commons-dbcp2 + 2.5.0 + + + + mysql + mysql-connector-java + 5.1.6 + + diff --git a/src/main/java/streams/StocksPrice.java b/src/main/java/streams/StocksPrice.java new file mode 100644 index 0000000..1af2209 --- /dev/null +++ b/src/main/java/streams/StocksPrice.java @@ -0,0 +1,49 @@ +package streams; + +import java.util.Date; + +public class StocksPrice { + + private String stockSymbol; + private Date tradeDate; + private double openPrice; + private double highPrice; + + public Date getTradeDate() { + return tradeDate; + } + + public double getHighPrice() { + return highPrice; + } + + public double getOpenPrice() { + return openPrice; + } + + public void setStockSymbol(String stockSymbol) { + this.stockSymbol = stockSymbol; + } + + public String getStockSymbol() { + return stockSymbol; + } + + public void setHighPrice(double highPrice) { + this.highPrice = highPrice; + } + + public void setOpenPrice(double openPrice) { + this.openPrice = openPrice; + } + + public void setTradeDate(Date tradeDate) { + this.tradeDate = tradeDate; + } + + @Override + public String toString() { + return String.format("Symbol=%s;Date=%s;open price=%s ; HighPrice=%s", stockSymbol, tradeDate, openPrice, highPrice); + } + +} diff --git a/src/main/java/streams/TableStreamApplication.java b/src/main/java/streams/TableStreamApplication.java new file mode 100644 index 0000000..0990ce8 --- /dev/null +++ b/src/main/java/streams/TableStreamApplication.java @@ -0,0 +1,51 @@ +package streams; + +import org.apache.commons.dbcp2.BasicDataSource; +import streams.database.TableStreamBuilder; +import streams.database.Where; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class TableStreamApplication { + + public static void main(String... args) { + + BasicDataSource ds = createDataSource(); + + TableStreamBuilder stocksTable = TableStreamBuilder + .with(ds) + .build(StocksPrice.class, "stocks_price", () -> new StocksPrice()); + + Stream rows = stocksTable.stream(); + + long count = rows + .filter(Where.GT("volume", 1467200)) + .filter(Where.GT("open_price", 1108d)) + .count(); + + System.out.println(count); + + + List o = (List) stocksTable.stream() + .filter(Where.GT("volume", 1467200)) + .filter(Where.GT("open_price", 1108d)) + .limit(2) + .collect(Collectors.toList()); + + System.out.println(o); + + + } + + private static BasicDataSource createDataSource() { + BasicDataSource ds = new BasicDataSource(); + ds.setDriverClassName("com.mysql.jdbc.Driver"); + ds.setUrl("jdbc:mysql://localhost:3306/playground"); + ds.setUsername("root"); + ds.setPassword("changeme"); + return ds; + } + +} diff --git a/src/main/java/streams/database/NoOpStream.java b/src/main/java/streams/database/NoOpStream.java new file mode 100644 index 0000000..3286c22 --- /dev/null +++ b/src/main/java/streams/database/NoOpStream.java @@ -0,0 +1,211 @@ +package streams.database; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.function.*; +import java.util.stream.*; + +public class NoOpStream implements Stream { + + @Override + public Stream filter(Predicate predicate) { + return null; + } + + @Override + public Stream map(Function mapper) { + return null; + } + + @Override + public IntStream mapToInt(ToIntFunction mapper) { + return null; + } + + @Override + public LongStream mapToLong(ToLongFunction mapper) { + return null; + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction mapper) { + return null; + } + + @Override + public Stream flatMap(Function> mapper) { + return null; + } + + @Override + public IntStream flatMapToInt(Function mapper) { + return null; + } + + @Override + public LongStream flatMapToLong(Function mapper) { + return null; + } + + @Override + public DoubleStream flatMapToDouble(Function mapper) { + return null; + } + + @Override + public Stream distinct() { + return null; + } + + @Override + public Stream sorted() { + return null; + } + + @Override + public Stream sorted(Comparator comparator) { + return null; + } + + @Override + public Stream peek(Consumer action) { + return null; + } + + @Override + public Stream limit(long maxSize) { + return null; + } + + @Override + public Stream skip(long n) { + return null; + } + + @Override + public void forEach(Consumer action) { + + } + + @Override + public void forEachOrdered(Consumer action) { + + } + + @Override + public Object[] toArray() { + return new Object[0]; + } + + @Override + public A[] toArray(IntFunction generator) { + return null; + } + + @Override + public T reduce(T identity, BinaryOperator accumulator) { + return null; + } + + @Override + public Optional reduce(BinaryOperator accumulator) { + return Optional.empty(); + } + + @Override + public U reduce(U identity, BiFunction accumulator, BinaryOperator combiner) { + return null; + } + + @Override + public R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { + return null; + } + + @Override + public R collect(Collector collector) { + return null; + } + + @Override + public Optional min(Comparator comparator) { + return Optional.empty(); + } + + @Override + public Optional max(Comparator comparator) { + return Optional.empty(); + } + + @Override + public long count() { + return 0; + } + + @Override + public boolean anyMatch(Predicate predicate) { + return false; + } + + @Override + public boolean allMatch(Predicate predicate) { + return false; + } + + @Override + public boolean noneMatch(Predicate predicate) { + return false; + } + + @Override + public Optional findFirst() { + return Optional.empty(); + } + + @Override + public Optional findAny() { + return Optional.empty(); + } + + @Override + public Iterator iterator() { + return null; + } + + @Override + public Spliterator spliterator() { + return null; + } + + @Override + public boolean isParallel() { + return false; + } + + @Override + public Stream sequential() { + return null; + } + + @Override + public Stream parallel() { + return null; + } + + @Override + public Stream unordered() { + return null; + } + + @Override + public Stream onClose(Runnable closeHandler) { + return null; + } + + @Override + public void close() { + + } +} diff --git a/src/main/java/streams/database/TableStreamBuilder.java b/src/main/java/streams/database/TableStreamBuilder.java new file mode 100644 index 0000000..3705d55 --- /dev/null +++ b/src/main/java/streams/database/TableStreamBuilder.java @@ -0,0 +1,217 @@ +package streams.database; + +import javax.sql.DataSource; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class TableStreamBuilder { + + static Pattern columnNamePattern = Pattern.compile("set([A-Z].*)([A-Z].*)"); + + private final DataSource ds; + private Class tableClass; + private String tableName; + private Supplier rowSupplier; + private List columnNames; + + public TableStreamBuilder(DataSource ds) { + this.ds = ds; + } + + public Stream stream() { + + List pipeLine = new ArrayList<>(); + + Stream dataStream = new NoOpStream() { + + @Override + public Stream filter(Predicate predicate) { + pipeLine.add(predicate); + return this; + } + + @Override + public Stream limit(long maxSize) { + pipeLine.add(new SQLLimit(maxSize)); + return this; + } + + @Override + public long count() { + + StringBuilder where = new StringBuilder(); + + decodePipelineStage(where, pipeLine); + + String sql = String.format("SELECT COUNT(1) FROM %s WHERE %s", tableName, where); + try (Connection con = openConnection(); ResultSet rs = executeQuery(sql, con)) { + + while (rs.next()) { + return rs.getLong(1); + } + return 0; + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public R collect(Collector collector) { + A container = collector.supplier().get(); + + String cols = columnNames + .stream() + .map(col -> col.columnName) + .collect(Collectors.joining(",")); + + StringBuilder where = new StringBuilder(); + decodePipelineStage(where, pipeLine); + + String sql = String.format("SELECT %s FROM %s WHERE %s", cols, tableName, where); + System.out.println(sql); + + try (Connection con = openConnection(); ResultSet rs = executeQuery(sql, con)) { + + while (rs.next()) { + T row = buildRow(rs); + collector.accumulator().accept(container, row); + } + + } catch (SQLException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + + + return collector.finisher().apply(container); + } + + }; + + return dataStream; + } + + private Connection openConnection() throws SQLException { + return ds.getConnection(); + } + + private ResultSet executeQuery(String sql, Connection con) throws SQLException { + return con.createStatement().executeQuery(sql); + } + + private T buildRow(ResultSet rs) throws SQLException, IllegalAccessException, InvocationTargetException { + T row = rowSupplier.get(); + int index = 0; + while (index < columnNames.size()) { + ColumnMetaData columnMetaData = columnNames.get(index); + Object value = rs.getObject(index + 1); + if (!rs.wasNull()) { + columnMetaData.method().invoke(row, value); + } + index++; + } + return row; + } + + private void decodePipelineStage(StringBuilder where, List pipeLine) { + for (Object stage : pipeLine) { + + if (isPredicate(stage)) { + processPredicate(where, (Predicate) stage); + } else if (isLimit(stage)) { + SQLLimit limit = (SQLLimit) stage; + where.append(" LIMIT " + limit.maxSize + " "); + } + } + } + + private boolean isLimit(Object stage) { + return (stage instanceof SQLLimit); + } + + private void processPredicate(StringBuilder where, Predicate stage) { + Predicate predicate = stage; + + if (predicate instanceof Where.GreaterThan) { + + Where.GreaterThan gt = (Where.GreaterThan) predicate; + if (where.length() > 0) { + where.append(" AND "); + } + where.append(String.format("%s > %s", gt.columnName(), gt.value())); + } + } + + private boolean isPredicate(Object stage) { + return stage instanceof Predicate; + } + + public TableStreamBuilder build(Class row, String tableName, Supplier supplier) { + + this.tableClass = row; + this.tableName = tableName; + this.rowSupplier = supplier; + this.columnNames = identifyColumnNames(row); + + return this; + } + + private static List identifyColumnNames(Class row) { + return Arrays.asList(row.getMethods()).stream() + .filter(method -> method.getName().startsWith("set")) + .map(method -> new TableStreamBuilder.ColumnMetaData(method, columnNamePattern.matcher(method.getName()))) + .filter(x -> x.matcher.matches()) + .map(x -> x.build()) + .collect(Collectors.toList()); + } + + public static TableStreamBuilder with(DataSource ds) { + return new TableStreamBuilder(ds); + } + + static class ColumnMetaData { + + final Method setMethod; + String columnName; + final Matcher matcher; + + ColumnMetaData(Method methodRef, Matcher matcher) { + this.setMethod = methodRef; + this.matcher = matcher; + } + + public Method method() { + return setMethod; + } + + public String columnName() { + return columnName; + } + + public ColumnMetaData build() { + columnName = matcher.group(1).toLowerCase() + "_" + matcher.group(2).toLowerCase(); + return this; + } + } + + private static class SQLLimit { + private final long maxSize; + + public SQLLimit(long maxSize) { + this.maxSize = maxSize; + } + } +} diff --git a/src/main/java/streams/database/Where.java b/src/main/java/streams/database/Where.java new file mode 100644 index 0000000..055dbc3 --- /dev/null +++ b/src/main/java/streams/database/Where.java @@ -0,0 +1,57 @@ +package streams.database; + +import java.util.function.Predicate; + +public class Where { + + enum Operator { + GT + } + + public static Predicate GT(String col, int value) { + return new GreaterThan(Operator.GT, col, value); + } + + public static Predicate GT(String col, double value) { + return new GreaterThan(Operator.GT, col, value); + } + + public static abstract class WherePredicate implements Predicate { + + private final Operator op; + private final String columnName; + + WherePredicate(Operator op, String columnName) { + this.op = op; + this.columnName = columnName; + } + + public Operator Op() { + return op; + } + + public String columnName() { + return columnName; + } + } + + public static class GreaterThan extends WherePredicate { + + private final Object value; + + GreaterThan(Operator op, String columnName, Object value) { + super(op, columnName); + this.value = value; + } + + public Object value() { + return value; + } + + @Override + public boolean test(T t) { + return false; + } + } + +}