Open
Description
Describe the bug
I am having trouble with data post-processing in AWS Sagemaker, where I need to split one large text file with predictions (~2-10 GB) into millions of small files (one file per user ~3-10KB).
I've been able to process a small dataset (32MB, 13540 records). When I try 1.2 million records (2.2 GB), ScriptProcessor successfully processes the input file and saves the output files to /opt/ml/processing/output
, however it fails to put them in S3 with an error.
To reproduce
Jupyter notebook:
import boto3
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput, NetworkConfig
role = get_execution_role()
instance_type = 'ml.m4.4xlarge'
ecr_image_full_name = '0123456789.dkr.ecr.eu-central.amazonaws.com/maslick-sagemaker-processing-image:latest'
input_file = 'input.csv'
input_object = 's3://my-awesome-dataset/input.csv'
output_object = 's3://my-awesome-results'
network_config = NetworkConfig(enable_network_isolation=False,
subnets=["subnet-12345", "subnet-67890"],
security_group_ids=["sg-0123456789"])
script_processor = ScriptProcessor(role=role,
image_uri=ecr_image_full_name,
command=['python3'],
instance_count=1,
instance_type=instance_type)
input = ProcessingInput(source=input_object, destination='/opt/ml/processing/input')
output = ProcessingOutput(source='/opt/ml/processing/output', destination=output_object)
script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
callable.py:
import hashlib
import json
import sys
from collections import defaultdict
from concurrent.futures.process import ProcessPoolExecutor
from pathlib import Path
import pandas as pd
def saveFilesMultiProcesses(items):
with ProcessPoolExecutor() as executor:
for item in items:
executor.submit(saveFile, item)
def readCsv(input_file):
colnames = ['id', 'article', 'type', 'rank']
df = pd.read_csv('/opt/ml/processing/input/{}'.format(input_file), sep='|', names=colnames)
return df
def processCsv(df):
dicts = []
for row in df.itertuples():
dict = defaultdict(lambda: defaultdict(list))
dict["id"] = row.id
dict["article"] = row.article
dict["type"] = row.type
dict["rank"] = row.rank
dicts.append(dict)
return dicts
def saveFile(item):
hashed_prefix = hashlib.md5(str(item['id']).encode('utf-8')).hexdigest()
short = hashed_prefix[:5]
file_name = short + "_" + str(item['id']) + "_latest.json"
outfile = Path('/opt/ml/processing/output', file_name)
with open(outfile, 'w') as json_file:
json.dump(item, json_file)
if __name__ == '__main__':
input_file = sys.argv[1]
df = readCsv(input_file)
list_of_dicts = processCsv(df)
saveFilesMultiProcesses(list_of_dicts)
print("Done. Wait until all files are saved to S3")
Dockerfile:
FROM python:3.7-slim-buster
RUN pip3 install pandas==0.25.3
ENV PYTHONUNBUFFERED=TRUE
Expected behavior
All files that I save to /opt/ml/processing/output
should be saved to S3.
Screenshots or logs
---------------------------------------------------------------------------
UnexpectedStatusException Traceback (most recent call last)
<ipython-input-66-48dccaef0bee> in <module>()
----> 1 script_processor.run(code='callable.py', inputs=[input], outputs=[output], arguments=[input_file])
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in run(self, code, inputs, outputs, arguments, wait, logs, job_name, experiment_config)
402 self.jobs.append(self.latest_job)
403 if wait:
--> 404 self.latest_job.wait(logs=logs)
405
406 def _get_user_code_name(self, code):
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/processing.py in wait(self, logs)
726 """
727 if logs:
--> 728 self.sagemaker_session.logs_for_processing_job(self.job_name, wait=True)
729 else:
730 self.sagemaker_session.wait_for_processing_job(self.job_name)
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in logs_for_processing_job(self, job_name, wait, poll)
3132
3133 if wait:
-> 3134 self._check_job_status(job_name, description, "ProcessingJobStatus")
3135 if dot:
3136 print()
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/session.py in _check_job_status(self, job, desc, status_key_name)
2636 ),
2637 allowed_statuses=["Completed", "Stopped"],
-> 2638 actual_status=status,
2639 )
2640
UnexpectedStatusException: Error for Processing job maslick-sagemaker-processing-image-2020-06-11-15-42-34-593: Failed. Reason: InternalServerError: We encountered an internal error. Please try again.
System information
- SageMaker Python SDK version: 1.50.17
- Framework name (eg. PyTorch) or algorithm (eg. KMeans): pandas
- Framework version: 0.25.3
- Python version: 3.7.4
- CPU or GPU: CPU
- Custom Docker image (Y/N): Y
Additional context
See my stackoverflow question for more details.