Skip to content

Introduce .bin.ndjson format #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions tritonparse/decompress_bin_ndjson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
"""
Script to decompress .bin.ndjson files back to regular .ndjson format.

The .bin.ndjson format stores each JSON record as a separate gzip member,
concatenated in sequence within a single binary file. This script uses
gzip.open() which automatically handles member concatenation to read
the compressed file and write out the original NDJSON format.
"""

import argparse
import gzip
import os
import sys
from pathlib import Path


def decompress_bin_ndjson(input_file: str, output_file: str = None) -> None:
"""
Decompress a .bin.ndjson file to regular .ndjson format.

Args:
input_file: Path to the .bin.ndjson file
output_file: Path for the output .ndjson file (optional)
"""
input_path = Path(input_file)

# Validate input file
if not input_path.exists():
print(f"Error: Input file '{input_file}' does not exist", file=sys.stderr)
return

if not input_path.suffix.endswith('.bin.ndjson'):
print(f"Warning: Input file '{input_file}' doesn't have .bin.ndjson extension")

# Determine output file path
if output_file is None:
if input_path.name.endswith('.bin.ndjson'):
# Replace .bin.ndjson with .ndjson
output_file = str(input_path.with_suffix('').with_suffix('.ndjson'))
else:
# Add .decompressed.ndjson suffix
output_file = str(input_path.with_suffix('.decompressed.ndjson'))

output_path = Path(output_file)

try:
line_count = 0
with gzip.open(input_path, 'rt', encoding='utf-8') as compressed_file:
with open(output_path, 'w', encoding='utf-8') as output:
for line in compressed_file:
# gzip.open automatically handles member concatenation
# Each line is already a complete JSON record with newline
output.write(line)
line_count += 1

# Get file sizes for comparison
input_size = input_path.stat().st_size
output_size = output_path.stat().st_size
compression_ratio = (1 - input_size / output_size) * 100 if output_size > 0 else 0

print(f"Successfully decompressed '{input_file}' to '{output_file}'")
print(f" Input size: {input_size:,} bytes")
print(f" Output size: {output_size:,} bytes")
print(f" Compression ratio: {compression_ratio:.1f}%")
print(f" Records processed: {line_count:,}")

except gzip.BadGzipFile as e:
print(f"Error: Invalid gzip format in '{input_file}': {e}", file=sys.stderr)
except UnicodeDecodeError as e:
print(f"Error: Unicode decode error in '{input_file}': {e}", file=sys.stderr)
except Exception as e:
print(f"Error: Failed to decompress '{input_file}': {e}", file=sys.stderr)


def main():
parser = argparse.ArgumentParser(
description="Decompress .bin.ndjson files to regular .ndjson format",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s trace.bin.ndjson
%(prog)s trace.bin.ndjson -o output.ndjson
%(prog)s /logs/dedicated_log_triton_trace_user_.bin.ndjson
"""
)

parser.add_argument(
'input_file',
help='Input .bin.ndjson file to decompress'
)

parser.add_argument(
'-o', '--output',
help='Output .ndjson file path (default: replace .bin.ndjson with .ndjson)'
)

parser.add_argument(
'-v', '--verbose',
action='store_true',
help='Enable verbose output'
)

args = parser.parse_args()

if args.verbose:
print(f"Decompressing: {args.input_file}")
if args.output:
print(f"Output file: {args.output}")

decompress_bin_ndjson(args.input_file, args.output)


if __name__ == "__main__":
main()
29 changes: 24 additions & 5 deletions tritonparse/extract_source_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import argparse
import gzip
import json
import logging
import os
Expand Down Expand Up @@ -555,15 +556,33 @@ def parse_single_file(
# Set default output directory if not provided
output_dir = output_dir or os.path.dirname(file_path)

with open(file_path, "r") as f:
# Check if input file is compressed based on file extension
is_compressed_input = file_path.endswith('.bin.ndjson')

# Open file in appropriate mode - use gzip.open for compressed files
if is_compressed_input:
# Use gzip.open which automatically handles member concatenation
file_handle = gzip.open(file_path, 'rt', encoding='utf-8')
else:
file_handle = open(file_path, "r")

with file_handle as f:
file_name = os.path.basename(file_path)
file_name_without_extension = os.path.splitext(file_name)[0]
# Handle .bin.ndjson extension properly
if is_compressed_input:
file_name_without_extension = file_name[:-11] # Remove .bin.ndjson
else:
file_name_without_extension = os.path.splitext(file_name)[0]

# Process lines uniformly for both compressed and uncompressed files
for i, line in enumerate(f):
logger.debug(f"Processing line {i + 1} in {file_path}")
# Skip empty lines
if not line.strip():

json_str = line.strip()
if not json_str:
continue
parsed_line = parse_single_trace_content(line)

parsed_line = parse_single_trace_content(json_str)
if not parsed_line:
logger.warning(f"Failed to parse line {i + 1} in {file_path}")
continue
Expand Down
30 changes: 26 additions & 4 deletions tritonparse/structured_logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.

import atexit
import gzip
import importlib
import inspect
import json
Expand All @@ -22,6 +23,8 @@
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB limit for file content extraction
# Enable ndjson output. json format is only for debugging purpose.
TRITONPARSE_NDJSON = os.getenv("TRITONPARSE_NDJSON", "1") in ["1", "true", "True"]
# Enable gzip compression for each line in trace files
TRITON_TRACE_GZIP = os.getenv("TRITON_TRACE_GZIP", "0") in ["1", "true", "True"]
triton_trace_log = logging.getLogger("tritonparse_trace")
# The folder to store the triton trace log.
triton_trace_folder = os.environ.get("TRITON_TRACE", None)
Expand Down Expand Up @@ -313,7 +316,8 @@ def format(self, record: logging.LogRecord):
return json.dumps(clean_log_entry, indent=2)
else:
# NDJSON format requires a newline at the end of each line
return json.dumps(clean_log_entry, separators=(",", ":")) + "\n"
json_str = json.dumps(clean_log_entry, separators=(",", ":"))
return json_str + "\n"


class TritonTraceHandler(logging.StreamHandler):
Expand Down Expand Up @@ -394,12 +398,19 @@ def emit(self, record):
ranksuffix = f"rank_{dist.get_rank()}_"
filename = f"{self.prefix}{ranksuffix}"
self._ensure_stream_closed()
# Choose file extension and mode based on compression setting
if TRITON_TRACE_GZIP:
file_extension = ".bin.ndjson"
file_mode = "ab+" # Binary mode for gzip member concatenation
else:
file_extension = ".ndjson"
file_mode = "a+"
log_file_name = os.path.abspath(
os.path.join(root_dir, f"{filename}.ndjson")
os.path.join(root_dir, f"{filename}{file_extension}")
)
self.stream = open(
log_file_name,
mode="a+",
mode=file_mode,
)
log.debug("TritonTraceHandler: logging to %s", log_file_name)
else:
Expand All @@ -408,7 +419,18 @@ def emit(self, record):

if self.stream:
formatted = self.format(record)
self.stream.write(formatted)
if TRITON_TRACE_GZIP:
# Create a separate gzip member for each record
# This allows standard gzip readers to handle member concatenation automatically
import io
buffer = io.BytesIO()
with gzip.GzipFile(fileobj=buffer, mode='wb') as gz:
gz.write(formatted.encode('utf-8'))
# Write the complete gzip member to the file
compressed_data = buffer.getvalue()
self.stream.write(compressed_data)
else:
self.stream.write(formatted)
self.flush()
except Exception as e:
# record exception and ensure resources are released
Expand Down