11package software .amazon .jdbc .plugin .cache ;
22
3+ import org .checkerframework .checker .nullness .qual .Nullable ;
34import java .io .ByteArrayInputStream ;
45import java .io .ByteArrayOutputStream ;
56import java .io .InputStream ;
67import java .io .IOException ;
78import java .io .Reader ;
8- import java .io .Serializable ;
99import java .io .ObjectInputStream ;
1010import java .io .ObjectOutputStream ;
1111import java .math .BigDecimal ;
4242
4343public class CachedResultSet implements ResultSet {
4444
45- public static class CachedRow implements Serializable {
45+ public static class CachedRow {
4646 private final Object [] rowData ;
47+ final byte [] @ Nullable [] rawData ;
4748
4849 public CachedRow (int numColumns ) {
4950 rowData = new Object [numColumns ];
51+ rawData = new byte [numColumns ][];
5052 }
5153
52- public void put (final int columnIndex , final Object columnValue ) throws SQLException {
54+ private void checkColumnIndex (final int columnIndex ) throws SQLException {
5355 if (columnIndex < 1 || columnIndex > rowData .length ) {
54- throw new SQLException ("Invalid Column Index when populating CachedRow: " + columnIndex );
56+ throw new SQLException ("Invalid Column Index when operating CachedRow: " + columnIndex );
5557 }
58+ }
59+
60+ public void put (final int columnIndex , final Object columnValue ) throws SQLException {
61+ checkColumnIndex (columnIndex );
5662 rowData [columnIndex -1 ] = columnValue ;
5763 }
5864
65+ public void putRaw (final int columnIndex , final byte [] rawColumnValue ) throws SQLException {
66+ checkColumnIndex (columnIndex );
67+ rawData [columnIndex -1 ] = rawColumnValue ;
68+ }
69+
5970 public Object get (final int columnIndex ) throws SQLException {
60- if (columnIndex < 1 || columnIndex > rowData .length ) {
61- throw new SQLException ("Invalid Column Index when getting CachedRow value: " + columnIndex );
71+ checkColumnIndex (columnIndex );
72+ // De-serialize the data object from raw bytes if needed.
73+ if (rowData [columnIndex -1 ] == null && rawData [columnIndex -1 ] != null ) {
74+ long startTime = System .nanoTime ();
75+ try (ByteArrayInputStream bis = new ByteArrayInputStream (rawData [columnIndex - 1 ]);
76+ ObjectInputStream ois = new ObjectInputStream (bis )) {
77+ rowData [columnIndex - 1 ] = ois .readObject ();
78+ rawData [columnIndex - 1 ] = null ;
79+ } catch (ClassNotFoundException e ) {
80+ throw new SQLException ("ClassNotFoundException while de-serializing caching resultSet for column: " + columnIndex , e );
81+ } catch (IOException e ) {
82+ throw new SQLException ("IOException while de-serializing caching resultSet for column: " + columnIndex , e );
83+ }
6284 }
6385 return rowData [columnIndex - 1 ];
6486 }
@@ -73,6 +95,12 @@ public Object get(final int columnIndex) throws SQLException {
7395 private final HashMap <String , Integer > columnNames ;
7496 private volatile boolean closed ;
7597
98+ /**
99+ * Create a CachedResultSet out of the original ResultSet queried from the database.
100+ * @param resultSet The ResultSet queried from the underlying database (not a CachedResultSet).
101+ * @return CachedResultSet that captures the metadata and the rows of the input ResultSet.
102+ * @throws SQLException
103+ */
76104 public CachedResultSet (final ResultSet resultSet ) throws SQLException {
77105 ResultSetMetaData srcMetadata = resultSet .getMetaData ();
78106 final int numColumns = srcMetadata .getColumnCount ();
@@ -116,14 +144,29 @@ private CachedResultSet(final CachedResultSetMetaData md, final ArrayList<Cached
116144 wasNullFlag = false ;
117145 }
118146
147+ // Serialize the content of metadata and data rows for the current CachedResultSet into a byte array
119148 public byte [] serializeIntoByteArray () throws SQLException {
149+ long startTime = System .nanoTime ();
120150 // Serialize the metadata and then the rows
121151 try (ByteArrayOutputStream baos = new ByteArrayOutputStream ();
122152 ObjectOutputStream output = new ObjectOutputStream (baos )) {
123- output .writeObject (this . metadata );
153+ output .writeObject (metadata );
124154 output .writeInt (rows .size ());
155+ int numColumns = metadata .getColumnCount ();
125156 while (this .next ()) {
126- output .writeObject (rows .get (currentRow ));
157+ // serialize individual column fields in each row
158+ CachedRow row = rows .get (currentRow );
159+ for (int i = 0 ; i < numColumns ; i ++) {
160+ try (ByteArrayOutputStream objBytes = new ByteArrayOutputStream ();
161+ ObjectOutputStream objStream = new ObjectOutputStream (objBytes )) {
162+ objStream .writeObject (row .get (i + 1 ));
163+ objStream .flush ();
164+ byte [] dataByteArray = objBytes .toByteArray ();
165+ int serializedLength = dataByteArray .length ;
166+ output .writeInt (serializedLength );
167+ output .write (dataByteArray , 0 , serializedLength );
168+ }
169+ }
127170 }
128171 output .flush ();
129172 return baos .toByteArray ();
@@ -132,13 +175,35 @@ public byte[] serializeIntoByteArray() throws SQLException {
132175 }
133176 }
134177
178+ /**
179+ * Form a ResultSet from the raw data from the cache server. Each of the column objects are stored as
180+ * raw bytes and the actual de-serialization into Java objects will happen lazily upon access later on.
181+ */
135182 public static ResultSet deserializeFromByteArray (byte [] data ) throws SQLException {
136- try (ByteArrayInputStream bis = new ByteArrayInputStream (data ); ObjectInputStream ois = new ObjectInputStream (bis )) {
183+ long startTime = System .nanoTime ();
184+ try (ByteArrayInputStream bis = new ByteArrayInputStream (data );
185+ ObjectInputStream ois = new ObjectInputStream (bis )) {
137186 CachedResultSetMetaData metadata = (CachedResultSetMetaData ) ois .readObject ();
138187 int numRows = ois .readInt ();
188+ int numColumns = metadata .getColumnCount ();
139189 ArrayList <CachedRow > resultRows = new ArrayList <>(numRows );
140190 for (int i = 0 ; i < numRows ; i ++) {
141- resultRows .add ((CachedRow ) ois .readObject ());
191+ // Store the raw bytes for each column object in CachedRow
192+ final CachedRow row = new CachedRow (numColumns );
193+ for (int j = 0 ; j < numColumns ; j ++) {
194+ int nextObjSize = ois .readInt (); // The size of the next serialized object in its raw bytes form
195+ byte [] objData = new byte [nextObjSize ];
196+ int lengthRead = 0 ;
197+ while (lengthRead < nextObjSize ) {
198+ int bytesRead = ois .read (objData , lengthRead , nextObjSize -lengthRead );
199+ if (bytesRead == -1 ) {
200+ throw new SQLException ("End of stream reached when reading the data for CachedResultSet" );
201+ }
202+ lengthRead += bytesRead ;
203+ }
204+ row .putRaw (j +1 , objData );
205+ }
206+ resultRows .add (row );
142207 }
143208 return new CachedResultSet (metadata , resultRows );
144209 } catch (ClassNotFoundException e ) {
0 commit comments