-
Notifications
You must be signed in to change notification settings - Fork 731
Description
Hi,
I wanna use this named entity recognition model (https://sparknlp.org/2020/01/22/glove_100d.html) inside AWS glue script, the problem is i cannot download model inside glue environment it gives error, my code as follows:
first I tried this code:
`import sys
import time
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
Import Spark NLP components
from sparknlp import *
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.ml import PipelineModel
def log_time(message, start_time=None):
if start_time:
elapsed = time.time() - start_time
print(f"{message}: {elapsed:.2f} seconds")
return time.time()
else:
print(message)
return time.time()
Job Setup
job_start = log_time("Starting job")
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1'
#spark.conf.set("spark.jsl.settings.pretrained.cache_folder", "hdfs:///tmp/cache_pretrained")
#spark.conf.set("spark.jsl.settings.storage.cluster_tmp_dir", "hdfs:///tmp/spark_tmp")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
import subprocess
Create directories(I tried with this configs and without as well)
#subprocess.run(["hadoop", "fs", "-mkdir", "-p", "/tmp/cache_pretrained"])
#subprocess.run(["hadoop", "fs", "-mkdir", "-p", "/tmp/spark_tmp"])
#subprocess.run(["hadoop", "fs", "-chmod", "777", "/tmp/cache_pretrained"])
#subprocess.run(["hadoop", "fs", "-chmod", "777", "/tmp/spark_tmp"])
print("Building Spark NLP pipeline with onto_100 model...")
model_start = time.time()
documentAssembler = DocumentAssembler()
.setInputCol('text')
.setOutputCol('document')
tokenizer = Tokenizer()
.setInputCols(['document'])
.setOutputCol('token')
Load embeddings and NER model
embeddings = WordEmbeddingsModel.pretrained('glove_100d')
.setInputCols(['document', 'token'])
.setOutputCol('embeddings')
ner_model = NerDLModel.pretrained('onto_100', 'en')
.setInputCols(['document', 'token', 'embeddings'])
.setOutputCol('ner')
ner_converter = NerConverter()
.setInputCols(['document', 'token', 'ner'])
.setOutputCol('ner_chunk')
pipeline = Pipeline(stages=[
documentAssembler,
tokenizer,
embeddings,
ner_model,
ner_converter
])
log_time("Pipeline build complete", model_start)
Read Source Data
read_start = log_time("Reading input CSV from S3")
input_dynamic_frame = glueContext.create_dynamic_frame.from_options(
format_options={"withHeader": True, "separator": ",", "optimizePerformance": False},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://test/test.csv"], "recurse": True},
transformation_ctx="AmazonS3_node"
)
df = input_dynamic_frame.toDF()
print('before filtering count:', df.count())
log_time("Read complete", read_start)
df_with_index = df.withColumn("original_index", monotonically_increasing_id())
string_types = [StringType, VarcharType, CharType]
numerical_types = [ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType]
acceptable_types = string_types + numerical_types
str_cols = [field.name for field in df_with_index.schema.fields if any(isinstance(field.dataType, t) for t in acceptable_types)]
if not str_cols:
print("No string columns found")
job.commit()
exit()
first_col = str_cols[0]
print(f"Processing column: {first_col}")
Filter data
filtered_df = df_with_index.select("*", col(first_col).alias("text"))
.filter(col("text").isNotNull() & (col("text") != "")) \
print('after filtering count:', filtered_df.count())
repartitioned_df = filtered_df
Fit and transform the pipeline
ner_start = log_time("Applying onto_100 Spark NLP pipeline")
df_ner = pipeline.fit(repartitioned_df).transform(repartitioned_df)
if "ner_chunk" not in df_ner.columns:
print("No ner_chunk column found")
job.commit()
exit()
df_with_entities = df_ner.withColumn(
"person_entities",
filter(col("ner_chunk"),
lambda c: coalesce(c.getItem("metadata").getItem("entity"), lit("UNKNOWN")) == "PERSON")
).withColumn(
"detected_names",
when(size(col("person_entities")) > 0,
concat_ws(", ",
transform(col("person_entities"),
lambda c: c.getItem("result"))))
.otherwise(lit(""))
)
original_columns = [col_name for col_name in df_with_index.columns if col_name not in ["original_index", "text"]]
result_columns = original_columns + ["detected_names"]
df_result = df_with_entities.select(*result_columns, "original_index")
.orderBy("original_index")
.drop("original_index")
repartitioned_df.unpersist()
log_time("NER processing complete", ner_start)
Write Results to S3
write_start = log_time("Writing results to S3")
result_dynamic_frame = DynamicFrame.fromDF(df_result, glueContext, "result")
glueContext.write_dynamic_frame.from_options(
frame=result_dynamic_frame,
connection_type="s3",
format="glueparquet",
connection_options={"path": "s3://test/test", "partitionKeys": []},
format_options={"compression": "snappy"},
transformation_ctx="output"
)
log_time("Write complete", write_start)
log_time("Job complete", job_start)
job.commit()`
It starts downloading model files, it downloads onto_100 model without issues but when it comes to glove embeddings it gives this error:
Category: RESOURCE_NOT_FOUND_ERROR; Failed Line Number: 67; An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadModel. File file:/home/hadoop/cache_pretrained/glove_100d_en_2.4.0_2.4_1579690104032/metadata/part-00000 does not exist
I think the issue is my code is trying to load a model from the local file system (file:/home/...), but we are running on AWS Glue, which doesn’t allow that.
To solve this i tried to download directly from your sparknlp s3 location and load which i followed your this guide: https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/SparkNLP_offline_installation.ipynb.
I also tried uploading model files into my s3 bucket and load from there but it does not worked as well.
I tried these configs for my glue job:
#spark.conf.set("spark.jsl.settings.pretrained.cache_folder", "hdfs:///tmp/cache_pretrained")
#spark.conf.set("spark.jsl.settings.storage.cluster_tmp_dir", "hdfs:///tmp/spark_tmp")
but it does not also solve the problem.
I also tried to load model in google collab and save model with
fitted_pipeline = pipeline.fit(sample_data)
fitted_pipeline.write().overwrite().save("my_nlp_pipeline")
Then uploaded pipeline into my s3 and load directly pipeline from there, but it also did not work.
Also i tried exact same code with different model (https://sparknlp.org/2021/03/23/onto_recognize_entities_bert_tiny_en.html) by using: pipeline = PretrainedPipeline('onto_recognize_entities_bert_tiny', lang = 'en') and this model worked.
So could you please give me some guidance to solve this problem?