Skip to content

Commit 03b7fe4

Browse files
authored
JAPI-499 BinaryRecordReader support U8 to Decimal (hpcc-systems#599)
- Added flag to support treating U8s as BigDecimals - Created JUnit for U8 to Decimal Signed-off-by: James McMullan [email protected] Signed-off-by: James McMullan [email protected]
1 parent 2e21475 commit 03b7fe4

File tree

4 files changed

+98
-4
lines changed

4 files changed

+98
-4
lines changed

commons-hpcc/src/main/java/org/hpccsystems/commons/utils/Utils.java

+2
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,8 @@ public static boolean isBooleanKeyWord(String str)
657657
*/
658658
public static BigInteger extractUnsigned8Val(long unsigned8)
659659
{
660+
// Shift upper 32 bits of the incoming long value down and use bitwise & to get unsigned value of upper 32 bits
661+
// Shift upper 32 bits back once inside of BigInteger and add unsigned value of lower 32 bits
660662
return (BigInteger.valueOf((unsigned8 >> 32) & 0xffffffffL).shiftLeft(32)).add((BigInteger.valueOf(unsigned8 & 0xffffffffL)));
661663
}
662664
}

dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.hpccsystems.commons.ecl.HpccSrcType;
2929
import org.hpccsystems.commons.errors.HpccFileException;
3030
import org.hpccsystems.commons.errors.UnparsableContentException;
31+
import org.hpccsystems.commons.utils.Utils;
3132

3233
class CountingInputStream extends InputStream
3334
{
@@ -129,6 +130,7 @@ public class BinaryRecordReader implements IRecordReader
129130
protected boolean defaultLE;
130131
private long streamPosAfterLastRecord = 0;
131132
private boolean isIndex = false;
133+
private boolean useDecimalForUnsigned8 = false;
132134

133135
private byte[] scratchBuffer = new byte[BUFFER_GROW_SIZE];
134136

@@ -219,6 +221,16 @@ public void initialize(IRecordBuilder rb) throws Exception
219221
}
220222
}
221223

224+
/**
225+
* Determines if unsigned 8 values should be parsed into BigDecimals to avoid long overflow.
226+
*
227+
* @param useDecimal use decimal
228+
*/
229+
public void setUseDecimalForUnsigned8(boolean useDecimal)
230+
{
231+
useDecimalForUnsigned8 = useDecimal;
232+
}
233+
222234
/**
223235
* Should be set if this record reader is reading an index file.
224236
*
@@ -350,16 +362,27 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars
350362
if (fd.isUnsigned())
351363
{
352364
intValue = getUnsigned((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN);
353-
if (intValue < 0)
365+
if (useDecimalForUnsigned8 && fd.getDataLen() == 8)
366+
{
367+
BigInteger bi = Utils.extractUnsigned8Val(intValue);
368+
fieldValue = new BigDecimal(bi);
369+
}
370+
else
354371
{
355-
messages.addMessage("Warning: Possible unsigned overflow in column: '" + fd.getFieldName() + "'. Convert values to BigInteger via org.hpccsystems.commons.utils.extractUnsigned8 if necessary." );
372+
fieldValue = Long.valueOf(intValue);
373+
if (intValue < 0)
374+
{
375+
messages.addMessage("Warning: Possible unsigned overflow in column: '" + fd.getFieldName()
376+
+ "'. Convert values to BigInteger via org.hpccsystems.commons.utils.extractUnsigned8 if necessary, "
377+
+ " or call BinaryRecordReader.setUseDecimalForUnsigned8() before reading to convert unsigned8 values to BigDecimal values.");
378+
}
356379
}
357380
}
358381
else
359382
{
360383
intValue = getInt((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN, fd.isBiased());
384+
fieldValue = Long.valueOf(intValue);
361385
}
362-
fieldValue = Long.valueOf(intValue);
363386
break;
364387
case REAL:
365388
// fixed number of bytes (4 or 8) in type info

dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ public FileReadResumeInfo getFileReadResumeInfo(Long streamPosition)
240240
return resumeInfo;
241241
}
242242

243+
/**
244+
* Returns the number of messages created during the reading process
245+
*
246+
* @return number of messages created
247+
*/
243248
public int getRemoteReadMessageCount()
244249
{
245250
int count = 0;
@@ -249,6 +254,11 @@ public int getRemoteReadMessageCount()
249254
return count;
250255
}
251256

257+
/**
258+
* Returns messages created during the file reading process
259+
*
260+
* @return Messages concatenated into a String
261+
*/
252262
public String getRemoteReadMessages()
253263
{
254264
String report = "";
@@ -329,7 +339,6 @@ public void close() throws Exception
329339
}
330340

331341
/**
332-
* getAvailable
333342
* Returns the number of bytes available to read immediately.
334343
*
335344
* @return the available
@@ -341,11 +350,21 @@ public int getAvailable() throws IOException
341350
return this.binaryRecordReader.getAvailable();
342351
}
343352

353+
/**
354+
* Returns the RowServiceInputStream used to read the file from dafilesrv
355+
*
356+
* @return the input stream
357+
*/
344358
public RowServiceInputStream getInputStream()
345359
{
346360
return this.inputStream;
347361
}
348362

363+
/**
364+
* Returns the BinaryRecordReader used to construct records
365+
*
366+
* @return the record reader
367+
*/
349368
public BinaryRecordReader getRecordReader()
350369
{
351370
return this.binaryRecordReader;

dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java

+50
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.hpccsystems.commons.ecl.HpccSrcType;
3939
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
4040
import org.hpccsystems.commons.errors.HpccFileException;
41+
import org.hpccsystems.commons.utils.Utils;
4142
import org.hpccsystems.ws.client.HPCCWsDFUClient;
4243
import org.hpccsystems.ws.client.HPCCWsWorkUnitsClient;
4344
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
@@ -334,6 +335,46 @@ public void integrationLargeRecordTest() throws Exception
334335
}
335336
}
336337
}
338+
339+
@Test
340+
public void unsigned8ToDecimalTest() throws Exception
341+
{
342+
// Create a large record dataset
343+
FieldDef[] fieldDefs = new FieldDef[3];
344+
fieldDefs[0] = new FieldDef("field1", FieldType.INTEGER, "UNSIGNED8", 8, true, true, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
345+
fieldDefs[1] = new FieldDef("field2", FieldType.INTEGER, "UNSIGNED8", 8, true, true, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
346+
fieldDefs[2] = new FieldDef("field3", FieldType.INTEGER, "UNSIGNED8", 8, true, true, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
347+
FieldDef recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs);
348+
349+
List<HPCCRecord> originalRecords = new ArrayList<HPCCRecord>();
350+
for (int i = 0; i < 10; i++)
351+
{
352+
Object[] fields = new Object[3];
353+
fields[0] = new BigDecimal(Utils.extractUnsigned8Val((long) Long.MIN_VALUE)); // Max U8 value
354+
fields[1] = new BigDecimal(0); // Min U8 value
355+
fields[2] = new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE)); // First U8 value that would cause an overflow
356+
HPCCRecord record = new HPCCRecord(fields, recordDef);
357+
originalRecords.add(record);
358+
}
359+
360+
String datasetName = "benchmark::unsigned8::10rows";
361+
writeFile(originalRecords, datasetName, recordDef, connTO);
362+
363+
HPCCFile file = new HPCCFile(datasetName, connString , hpccUser, hpccPass);
364+
List<HPCCRecord> readRecords = readFile(file, connTO, false, true);
365+
if (readRecords.size() < 10)
366+
{
367+
Assert.fail("Failed to read " + datasetName);
368+
}
369+
370+
for (int i = 0; i < 10; i++)
371+
{
372+
HPCCRecord readRecord = readRecords.get(i);
373+
HPCCRecord originalRecord = originalRecords.get(i);
374+
375+
assertEquals(readRecord, originalRecord);
376+
}
377+
}
337378

338379
@Test
339380
public void numericOverflowTest() throws Exception
@@ -703,15 +744,22 @@ public void resumeFileReadTest() throws Exception
703744
}
704745

705746
public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception
747+
{
748+
return readFile(file, connectTimeoutMillis, shouldForceTimeout, false);
749+
}
750+
751+
public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout, boolean useDecimalForUnsigned8) throws Exception
706752
{
707753
if (file == null)
708754
{
709755
Assert.fail("HPCCFile construction failed.");
710756
}
757+
711758
if (connectTimeoutMillis != null)
712759
{
713760
file.setFileAccessExpirySecs(connectTimeoutMillis/1000);
714761
}
762+
715763
DataPartition[] fileParts = file.getFileParts();
716764
if (fileParts == null || fileParts.length == 0)
717765
{
@@ -731,6 +779,8 @@ public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, bo
731779
{
732780
HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
733781
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(fileParts[i], originalRD, recordBuilder);
782+
fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8);
783+
734784
fileReaders.add(fileReader);
735785
}
736786
catch (Exception e)

0 commit comments

Comments
 (0)