Skip to content

Commit 390d327

Browse files
committed
Rename and cleanup follow-up for chunk synchronization
1 parent bb0d71b commit 390d327

File tree

3 files changed

+50
-22
lines changed

3 files changed

+50
-22
lines changed

splunklib/searchcommands/internals.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import os
3636
import re
3737
import sys
38+
import warnings
3839

3940
from . import environment
4041

@@ -505,8 +506,8 @@ def __init__(self, ofile, maxresultrows=None):
505506

506507
self._inspector = OrderedDict()
507508
self._chunk_count = 0
508-
self._record_count = 0
509-
self._total_record_count = 0
509+
self._pending_record_count = 0
510+
self._committed_record_count = 0
510511

511512
@property
512513
def is_flushed(self):
@@ -524,6 +525,30 @@ def ofile(self):
524525
def ofile(self, value):
525526
self._ofile = set_binary_mode(value)
526527

528+
@property
529+
def pending_record_count(self):
530+
return self._pending_record_count
531+
532+
@property
533+
def _record_count(self):
534+
warnings.warn(
535+
"_record_count will be deprecated soon. Use pending_record_count instead.",
536+
PendingDeprecationWarning
537+
)
538+
return self.pending_record_count
539+
540+
@property
541+
def committed_record_count(self):
542+
return self._committed_record_count
543+
544+
@property
545+
def _total_record_count(self):
546+
warnings.warn(
547+
"_total_record_count will be deprecated soon. Use committed_record_count instead.",
548+
PendingDeprecationWarning
549+
)
550+
return self.committed_record_count
551+
527552
def write(self, data):
528553
bytes_type = bytes if sys.version_info >= (3, 0) else str
529554
if not isinstance(data, bytes_type):
@@ -555,7 +580,7 @@ def _clear(self):
555580
self._buffer.seek(0)
556581
self._buffer.truncate()
557582
self._inspector.clear()
558-
self._record_count = 0
583+
self._pending_record_count = 0
559584

560585
def _ensure_validity(self):
561586
if self._finished is True:
@@ -650,9 +675,9 @@ def _write_record(self, record):
650675
values += (repr(value), None)
651676

652677
self._writerow(values)
653-
self._record_count += 1
678+
self._pending_record_count += 1
654679

655-
if self._record_count >= self._maxresultrows:
680+
if self.pending_record_count >= self._maxresultrows:
656681
self.flush(partial=True)
657682

658683
try:
@@ -689,7 +714,7 @@ def flush(self, finished=None, partial=None):
689714

690715
RecordWriter.flush(self, finished, partial) # validates arguments and the state of this instance
691716

692-
if self._record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector):
717+
if self.pending_record_count > 0 or (self._chunk_count == 0 and 'messages' in self._inspector):
693718

694719
messages = self._inspector.get('messages')
695720

@@ -727,9 +752,9 @@ def flush(self, finished=None, partial=None):
727752
print(level, text, file=stderr)
728753

729754
self.write(self._buffer.getvalue())
730-
self._clear()
731755
self._chunk_count += 1
732-
self._total_record_count += self._record_count
756+
self._committed_record_count += self.pending_record_count
757+
self._clear()
733758

734759
self._finished = finished is True
735760

@@ -758,25 +783,22 @@ def flush(self, finished=None, partial=None):
758783

759784
def write_chunk(self, finished=None):
760785
inspector = self._inspector
761-
self._total_record_count += self._record_count
786+
self._committed_record_count += self.pending_record_count
762787
self._chunk_count += 1
763788

764789
# TODO: DVPL-6448: splunklib.searchcommands | Add support for partial: true when it is implemented in
765790
# ChunkedExternProcessor (See SPL-103525)
766791
#
767792
# We will need to replace the following block of code with this block:
768793
#
769-
# metadata = [
770-
# ('inspector', self._inspector if len(self._inspector) else None),
771-
# ('finished', finished),
772-
# ('partial', partial)]
794+
# metadata = [item for item in (('inspector', inspector), ('finished', finished), ('partial', partial))]
795+
#
796+
# if partial is True:
797+
# finished = False
773798

774799
if len(inspector) == 0:
775800
inspector = None
776801

777-
#if partial is True:
778-
# finished = False
779-
780802
metadata = [item for item in (('inspector', inspector), ('finished', finished))]
781803
self._write_chunk(metadata, self._buffer.getvalue())
782804
self._clear()
@@ -794,7 +816,7 @@ def write_metric(self, name, value):
794816
self._inspector['metric.' + name] = value
795817

796818
def _clear(self):
797-
RecordWriter._clear(self)
819+
super(RecordWriterV2, self)._clear()
798820
self._fieldnames = None
799821

800822
def _write_chunk(self, metadata, body):

splunklib/searchcommands/search_command.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,6 @@ def _process_protocol_v2(self, argv, ifile, ofile):
776776
# noinspection PyBroadException
777777
try:
778778
debug('Executing under protocol_version=2')
779-
#self._records = self._records_protocol_v2
780779
self._metadata.action = 'execute'
781780
self._execute(ifile, None)
782781
except SystemExit:
@@ -951,9 +950,12 @@ def _execute_v2(self, ifile, process):
951950

952951
def _execute_chunk_v2(self, process, chunk):
953952
metadata, body = chunk
954-
if len(body) > 0:
955-
records = self._read_csv_records(StringIO(body))
956-
self._record_writer.write_records(process(records))
953+
954+
if len(body) <= 0:
955+
return
956+
957+
records = self._read_csv_records(StringIO(body))
958+
self._record_writer.write_records(process(records))
957959

958960

959961
def _report_unexpected_error(self):

tests/searchcommands/test_internals_v2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,10 @@ def test_record_writer_with_random_data(self, save_recording=False):
229229

230230
self.assertEqual(writer._chunk_count, 0)
231231
self.assertEqual(writer._record_count, 31)
232+
self.assertEqual(writer.pending_record_count, 31)
232233
self.assertGreater(writer._buffer.tell(), 0)
233234
self.assertEqual(writer._total_record_count, 0)
235+
self.assertEqual(writer.committed_record_count, 0)
234236
self.assertListEqual(writer._fieldnames, fieldnames)
235237
self.assertListEqual(writer._inspector['messages'], messages)
236238

@@ -242,16 +244,18 @@ def test_record_writer_with_random_data(self, save_recording=False):
242244

243245
self.assertEqual(writer._chunk_count, 1)
244246
self.assertEqual(writer._record_count, 0)
247+
self.assertEqual(writer.pending_record_count, 0)
245248
self.assertEqual(writer._buffer.tell(), 0)
246249
self.assertEqual(writer._buffer.getvalue(), '')
247250
self.assertEqual(writer._total_record_count, 31)
251+
self.assertEqual(writer.committed_record_count, 31)
248252

249253
self.assertRaises(AssertionError, writer.flush, finished=True, partial=True)
250254
self.assertRaises(AssertionError, writer.flush, finished='non-boolean')
251255
self.assertRaises(AssertionError, writer.flush, partial='non-boolean')
252256
self.assertRaises(AssertionError, writer.flush)
253257

254-
# For SCPv2 we should follow the finish negotiation protocol.
258+
# P2 [ ] TODO: For SCPv2 we should follow the finish negotiation protocol.
255259
# self.assertRaises(RuntimeError, writer.write_record, {})
256260

257261
self.assertFalse(writer._ofile.closed)

0 commit comments

Comments
 (0)