12
12
*******************************************************************************/
13
13
package org .hpccsystems .dfs .client ;
14
14
15
+ import org .hpccsystems .dfs .client .Utils ;
16
+
15
17
import org .hpccsystems .commons .ecl .FieldDef ;
16
18
import org .hpccsystems .commons .ecl .RecordDefinitionTranslator ;
17
19
import org .hpccsystems .commons .errors .HpccFileException ;
20
+
21
+ import io .opentelemetry .api .common .AttributeKey ;
22
+ import io .opentelemetry .api .common .Attributes ;
23
+ import io .opentelemetry .api .trace .Span ;
24
+ import io .opentelemetry .semconv .ServerAttributes ;
25
+
18
26
import org .apache .logging .log4j .Logger ;
19
27
import org .apache .logging .log4j .LogManager ;
20
28
@@ -48,6 +56,9 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
48
56
private long openTimeMs = 0 ;
49
57
private long recordsRead = 0 ;
50
58
59
+ private Span readSpan = null ;
60
+ private String readSpanName = null ;
61
+
51
62
public static final int NO_RECORD_LIMIT = -1 ;
52
63
public static final int DEFAULT_READ_SIZE_OPTION = -1 ;
53
64
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1 ;
@@ -204,6 +215,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
204
215
this .createPrefetchThread = createPrefetchThread ;
205
216
this .socketOpTimeoutMs = socketOpTimeoutMs ;
206
217
218
+ this .readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp .getFileName () + "_" + dp .getThisPart ();
219
+ this .readSpan = Utils .createSpan (readSpanName );
220
+
221
+ String primaryIP = dp .getCopyIP (0 );
222
+ String secondaryIP = "" ;
223
+ if (dp .getCopyCount () > 1 )
224
+ {
225
+ secondaryIP = dp .getCopyIP (1 );
226
+ }
227
+
228
+ Attributes attributes = Attributes .of ( AttributeKey .stringKey ("server.primary.address" ), primaryIP ,
229
+ AttributeKey .stringKey ("server.secondary.address" ), secondaryIP ,
230
+ ServerAttributes .SERVER_PORT , Long .valueOf (dp .getPort ()),
231
+ AttributeKey .longKey ("read.size" ), Long .valueOf (readSizeKB *1000 ));
232
+ readSpan .setAllAttributes (attributes );
233
+
207
234
if (connectTimeout < 1 )
208
235
{
209
236
connectTimeout = RowServiceInputStream .DEFAULT_CONNECT_TIMEOUT_MILIS ;
@@ -212,18 +239,24 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
212
239
213
240
if (this .originalRecordDef == null )
214
241
{
215
- throw new Exception ("HpccRemoteFileReader: Provided original record definition is null, original record definition is required." );
242
+ Exception e = new Exception ("HpccRemoteFileReader: Provided original record definition is null, original record definition is required." );
243
+ this .readSpan .recordException (e );
244
+ this .readSpan .end ();
245
+ throw e ;
216
246
}
217
247
218
248
FieldDef projectedRecordDefinition = recBuilder .getRecordDefinition ();
219
249
if (projectedRecordDefinition == null )
220
250
{
221
- throw new Exception ("IRecordBuilder does not have a valid record definition." );
251
+ Exception e = new Exception ("IRecordBuilder does not have a valid record definition." );
252
+ this .readSpan .recordException (e );
253
+ this .readSpan .end ();
254
+ throw e ;
222
255
}
223
256
224
257
if (resumeInfo == null )
225
258
{
226
- this .inputStream = new RowServiceInputStream (this .dataPartition , this .originalRecordDef , projectedRecordDefinition , connectTimeout , limit , createPrefetchThread , readSizeKB , null , false , socketOpTimeoutMs );
259
+ this .inputStream = new RowServiceInputStream (this .dataPartition , this .originalRecordDef , projectedRecordDefinition , connectTimeout , limit , createPrefetchThread , readSizeKB , null , false , socketOpTimeoutMs , readSpan );
227
260
this .binaryRecordReader = new BinaryRecordReader (this .inputStream );
228
261
this .binaryRecordReader .initialize (this .recordBuilder );
229
262
@@ -238,11 +271,14 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
238
271
restartInfo .streamPos = resumeInfo .inputStreamPos ;
239
272
restartInfo .tokenBin = resumeInfo .tokenBin ;
240
273
241
- this .inputStream = new RowServiceInputStream (this .dataPartition , this .originalRecordDef , projectedRecordDefinition , connectTimeout , limit , createPrefetchThread , readSizeKB , restartInfo , false , socketOpTimeoutMs );
274
+ this .inputStream = new RowServiceInputStream (this .dataPartition , this .originalRecordDef , projectedRecordDefinition , connectTimeout , limit , createPrefetchThread , readSizeKB , restartInfo , false , socketOpTimeoutMs , this .readSpan );
275
+
242
276
long bytesToSkip = resumeInfo .recordReaderStreamPos - resumeInfo .inputStreamPos ;
243
277
if (bytesToSkip < 0 )
244
278
{
245
- throw new Exception ("Unable to restart unexpected stream pos in record reader." );
279
+ Exception e = new Exception ("Unable to restart read stream, unexpected stream position in record reader." );
280
+ this .readSpan .recordException (e );
281
+ this .readSpan .end ();
246
282
}
247
283
this .inputStream .skip (bytesToSkip );
248
284
@@ -279,9 +315,11 @@ private boolean retryRead()
279
315
280
316
try
281
317
{
318
+ this .readSpan = Utils .createSpan (readSpanName );
282
319
this .inputStream = new RowServiceInputStream (this .dataPartition , this .originalRecordDef ,
283
320
this .recordBuilder .getRecordDefinition (), this .connectTimeout , this .limit , this .createPrefetchThread ,
284
- this .readSizeKB , restartInfo , false , this .socketOpTimeoutMs );
321
+ this .readSizeKB , restartInfo , false , this .socketOpTimeoutMs , this .readSpan );
322
+
285
323
long bytesToSkip = resumeInfo .recordReaderStreamPos - resumeInfo .inputStreamPos ;
286
324
if (bytesToSkip < 0 )
287
325
{
@@ -294,6 +332,8 @@ private boolean retryRead()
294
332
}
295
333
catch (Exception e )
296
334
{
335
+ this .readSpan .recordException (e );
336
+ this .readSpan .end ();
297
337
log .error ("Failed to retry read for " + this .dataPartition .toString () + " " + e .getMessage (), e );
298
338
return false ;
299
339
}
@@ -499,7 +539,9 @@ public void close() throws Exception
499
539
return ;
500
540
}
501
541
542
+ this .readSpan .end ();
502
543
report ();
544
+
503
545
this .inputStream .close ();
504
546
isClosed = true ;
505
547
0 commit comments