Skip to content

Commit 057e391

Browse files
committed
Merge remote-tracking branch 'origin/candidate-8.12.x'
Signed-off-by: Gavin Halliday <[email protected]>
2 parents fc58ee7 + a8c5970 commit 057e391

File tree

4 files changed

+142
-8
lines changed

4 files changed

+142
-8
lines changed

commons-hpcc/src/main/java/org/hpccsystems/commons/utils/Utils.java

+2
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,8 @@ public static boolean isBooleanKeyWord(String str)
657657
*/
658658
public static BigInteger extractUnsigned8Val(long unsigned8)
659659
{
660+
// Shift upper 32 bits of the incoming long value down and use bitwise & to get unsigned value of upper 32 bits
661+
// Shift upper 32 bits back once inside of BigInteger and add unsigned value of lower 32 bits
660662
return (BigInteger.valueOf((unsigned8 >> 32) & 0xffffffffL).shiftLeft(32)).add((BigInteger.valueOf(unsigned8 & 0xffffffffL)));
661663
}
662664
}

dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java

+29-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.hpccsystems.commons.ecl.HpccSrcType;
2929
import org.hpccsystems.commons.errors.HpccFileException;
3030
import org.hpccsystems.commons.errors.UnparsableContentException;
31+
import org.hpccsystems.commons.utils.Utils;
3132

3233
class CountingInputStream extends InputStream
3334
{
@@ -129,6 +130,7 @@ public class BinaryRecordReader implements IRecordReader
129130
protected boolean defaultLE;
130131
private long streamPosAfterLastRecord = 0;
131132
private boolean isIndex = false;
133+
private boolean useDecimalForUnsigned8 = false;
132134

133135
private byte[] scratchBuffer = new byte[BUFFER_GROW_SIZE];
134136

@@ -219,6 +221,16 @@ public void initialize(IRecordBuilder rb) throws Exception
219221
}
220222
}
221223

224+
/**
225+
* Determines if unsigned 8 values should be parsed into BigDecimals to avoid long overflow.
226+
*
227+
* @param useDecimal use decimal
228+
*/
229+
public void setUseDecimalForUnsigned8(boolean useDecimal)
230+
{
231+
useDecimalForUnsigned8 = useDecimal;
232+
}
233+
222234
/**
223235
* Should be set if this record reader is reading an index file.
224236
*
@@ -350,16 +362,27 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars
350362
if (fd.isUnsigned())
351363
{
352364
intValue = getUnsigned((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN);
353-
if (intValue < 0)
365+
if (useDecimalForUnsigned8 && fd.getDataLen() == 8)
366+
{
367+
BigInteger bi = Utils.extractUnsigned8Val(intValue);
368+
fieldValue = new BigDecimal(bi);
369+
}
370+
else
354371
{
355-
messages.addMessage("Warning: Possible unsigned overflow in column: '" + fd.getFieldName() + "'. Convert values to BigInteger via org.hpccsystems.commons.utils.extractUnsigned8 if necessary." );
372+
fieldValue = Long.valueOf(intValue);
373+
if (intValue < 0)
374+
{
375+
messages.addMessage("Warning: Possible unsigned overflow in column: '" + fd.getFieldName()
376+
+ "'. Convert values to BigInteger via org.hpccsystems.commons.utils.extractUnsigned8 if necessary, "
377+
+ " or call BinaryRecordReader.setUseDecimalForUnsigned8() before reading to convert unsigned8 values to BigDecimal values.");
378+
}
356379
}
357380
}
358381
else
359382
{
360383
intValue = getInt((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN, fd.isBiased());
384+
fieldValue = Long.valueOf(intValue);
361385
}
362-
fieldValue = Long.valueOf(intValue);
363386
break;
364387
case REAL:
365388
// fixed number of bytes (4 or 8) in type info
@@ -936,7 +959,7 @@ private String getNullTerminatedString(HpccSrcType stype) throws IOException
936959
throw new IOException("Error, unexpected EOS while constructing UTF16 string.");
937960
}
938961

939-
readSize = (readSize / 2) * 2;
962+
readSize = ((readSize + 1) / 2) * 2;
940963
if (readSize > OPTIMIZED_STRING_READ_AHEAD)
941964
{
942965
readSize = OPTIMIZED_STRING_READ_AHEAD;
@@ -947,7 +970,7 @@ private String getNullTerminatedString(HpccSrcType stype) throws IOException
947970

948971
for (int j = 0; j < readSize-1; j += 2)
949972
{
950-
if (scratchBuffer[j] == '\0' && scratchBuffer[j + 1] == '\0')
973+
if (scratchBuffer[strByteLen + j] == '\0' && scratchBuffer[strByteLen + j + 1] == '\0')
951974
{
952975
eosLocation = j;
953976
break;
@@ -995,7 +1018,7 @@ private String getNullTerminatedString(HpccSrcType stype) throws IOException
9951018

9961019
for (int j = 0; j < readSize; j++)
9971020
{
998-
if (scratchBuffer[j] == '\0')
1021+
if (scratchBuffer[strByteLen + j] == '\0')
9991022
{
10001023
eosLocation = j;
10011024
break;

dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ public FileReadResumeInfo getFileReadResumeInfo(Long streamPosition)
240240
return resumeInfo;
241241
}
242242

243+
/**
244+
* Returns the number of messages created during the reading process
245+
*
246+
* @return number of messages created
247+
*/
243248
public int getRemoteReadMessageCount()
244249
{
245250
int count = 0;
@@ -249,6 +254,11 @@ public int getRemoteReadMessageCount()
249254
return count;
250255
}
251256

257+
/**
258+
* Returns messages created during the file reading process
259+
*
260+
* @return Messages concatenated into a String
261+
*/
252262
public String getRemoteReadMessages()
253263
{
254264
String report = "";
@@ -329,7 +339,6 @@ public void close() throws Exception
329339
}
330340

331341
/**
332-
* getAvailable
333342
* Returns the number of bytes available to read immediately.
334343
*
335344
* @return the available
@@ -341,11 +350,21 @@ public int getAvailable() throws IOException
341350
return this.binaryRecordReader.getAvailable();
342351
}
343352

353+
/**
354+
* Returns the RowServiceInputStream used to read the file from dafilesrv
355+
*
356+
* @return the input stream
357+
*/
344358
public RowServiceInputStream getInputStream()
345359
{
346360
return this.inputStream;
347361
}
348362

363+
/**
364+
* Returns the BinaryRecordReader used to construct records
365+
*
366+
* @return the record reader
367+
*/
349368
public BinaryRecordReader getRecordReader()
350369
{
351370
return this.binaryRecordReader;

dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java

+91-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.hpccsystems.commons.ecl.HpccSrcType;
3939
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
4040
import org.hpccsystems.commons.errors.HpccFileException;
41+
import org.hpccsystems.commons.utils.Utils;
4142
import org.hpccsystems.ws.client.HPCCWsDFUClient;
4243
import org.hpccsystems.ws.client.HPCCWsWorkUnitsClient;
4344
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
@@ -275,7 +276,6 @@ public void getNullMetadataTest() throws Exception
275276
assertNull("Meta should be null for nonexistent file",meta);
276277
}
277278

278-
279279
private static final String ALPHABET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_";
280280
private static final SecureRandom RANDOM = new SecureRandom();
281281

@@ -334,6 +334,87 @@ public void integrationLargeRecordTest() throws Exception
334334
}
335335
}
336336
}
337+
338+
@Test
339+
public void unsigned8ToDecimalTest() throws Exception
340+
{
341+
// Create a large record dataset
342+
FieldDef[] fieldDefs = new FieldDef[3];
343+
fieldDefs[0] = new FieldDef("field1", FieldType.INTEGER, "UNSIGNED8", 8, true, true, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
344+
fieldDefs[1] = new FieldDef("field2", FieldType.INTEGER, "UNSIGNED8", 8, true, true, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
345+
fieldDefs[2] = new FieldDef("field3", FieldType.INTEGER, "UNSIGNED8", 8, true, true, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
346+
FieldDef recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs);
347+
348+
List<HPCCRecord> originalRecords = new ArrayList<HPCCRecord>();
349+
for (int i = 0; i < 10; i++)
350+
{
351+
Object[] fields = new Object[3];
352+
fields[0] = new BigDecimal(Utils.extractUnsigned8Val((long) Long.MIN_VALUE)); // Max U8 value
353+
fields[1] = new BigDecimal(0); // Min U8 value
354+
fields[2] = new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE)); // First U8 value that would cause an overflow
355+
HPCCRecord record = new HPCCRecord(fields, recordDef);
356+
originalRecords.add(record);
357+
}
358+
359+
String datasetName = "benchmark::unsigned8::10rows";
360+
writeFile(originalRecords, datasetName, recordDef, connTO);
361+
362+
HPCCFile file = new HPCCFile(datasetName, connString , hpccUser, hpccPass);
363+
List<HPCCRecord> readRecords = readFile(file, connTO, false, true);
364+
if (readRecords.size() < 10)
365+
{
366+
Assert.fail("Failed to read " + datasetName);
367+
}
368+
369+
for (int i = 0; i < 10; i++)
370+
{
371+
HPCCRecord readRecord = readRecords.get(i);
372+
HPCCRecord originalRecord = originalRecords.get(i);
373+
374+
assertEquals(readRecord, originalRecord);
375+
}
376+
}
377+
378+
@Test
379+
public void longStringTest() throws Exception
380+
{
381+
// Create a large record dataset
382+
FieldDef[] fieldDefs = new FieldDef[4];
383+
fieldDefs[0] = new FieldDef("LongVarUnicode", FieldType.VAR_STRING, "", 4, false, false, HpccSrcType.UTF16LE, new FieldDef[0]);
384+
fieldDefs[1] = new FieldDef("LongUnicode", FieldType.STRING, "", 64, true, false, HpccSrcType.UTF16LE, new FieldDef[0]);
385+
fieldDefs[2] = new FieldDef("LongVarString", FieldType.VAR_STRING, "", 4, false, false, HpccSrcType.SINGLE_BYTE_CHAR, new FieldDef[0]);
386+
fieldDefs[3] = new FieldDef("LongString", FieldType.STRING, "", 64, true, false, HpccSrcType.SINGLE_BYTE_CHAR, new FieldDef[0]);
387+
FieldDef recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs);
388+
389+
List<HPCCRecord> originalRecords = new ArrayList<HPCCRecord>();
390+
for (int i = 0; i < 10; i++)
391+
{
392+
Object[] fields = new Object[4];
393+
fields[0] = generateRandomString(1024);
394+
fields[1] = generateRandomString(64);
395+
fields[2] = generateRandomString(1024);
396+
fields[3] = generateRandomString(64);
397+
HPCCRecord record = new HPCCRecord(fields, recordDef);
398+
originalRecords.add(record);
399+
}
400+
401+
String datasetName = "benchmark::long_string::10rows";
402+
writeFile(originalRecords, datasetName, recordDef,connTO);
403+
404+
HPCCFile file = new HPCCFile(datasetName, connString , hpccUser, hpccPass);
405+
List<HPCCRecord> readRecords = readFile(file, connTO, false);
406+
if (readRecords.size() < 10)
407+
{
408+
Assert.fail("Failed to read " + datasetName + " dataset");
409+
}
410+
411+
for (int i = 0; i < 10; i++)
412+
{
413+
HPCCRecord originalRecord = originalRecords.get(i);
414+
HPCCRecord readRecord = originalRecords.get(i);
415+
Assert.assertEquals(originalRecord, readRecord);
416+
}
417+
}
337418

338419
@Test
339420
public void numericOverflowTest() throws Exception
@@ -703,15 +784,22 @@ public void resumeFileReadTest() throws Exception
703784
}
704785

705786
public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception
787+
{
788+
return readFile(file, connectTimeoutMillis, shouldForceTimeout, false);
789+
}
790+
791+
public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout, boolean useDecimalForUnsigned8) throws Exception
706792
{
707793
if (file == null)
708794
{
709795
Assert.fail("HPCCFile construction failed.");
710796
}
797+
711798
if (connectTimeoutMillis != null)
712799
{
713800
file.setFileAccessExpirySecs(connectTimeoutMillis/1000);
714801
}
802+
715803
DataPartition[] fileParts = file.getFileParts();
716804
if (fileParts == null || fileParts.length == 0)
717805
{
@@ -731,6 +819,8 @@ public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, bo
731819
{
732820
HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
733821
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(fileParts[i], originalRD, recordBuilder);
822+
fileReader.getRecordReader().setUseDecimalForUnsigned8(useDecimalForUnsigned8);
823+
734824
fileReaders.add(fileReader);
735825
}
736826
catch (Exception e)

0 commit comments

Comments
 (0)