@@ -639,42 +639,43 @@ async def _check_job_content_match(
639639 # Handle different label formats
640640 if platform in ['x' , 'twitter' ]:
641641 # For X: check if label is in tweet_hashtags array (handles multiple hashtags)
642- tweet_hashtags = row .get ('tweet_hashtags' , [])
643-
644- # Handle pandas types - tweet_hashtags might be a list, numpy array, or Series
645- if pd .notna (tweet_hashtags ) and hasattr (tweet_hashtags , '__iter__' ) and not isinstance (tweet_hashtags , str ):
646- # Convert to list if it's array-like (handles pandas Series, numpy arrays, lists)
647- try :
648- hashtags_list = list (tweet_hashtags ) if not isinstance (tweet_hashtags , list ) else tweet_hashtags
649- # Normalize hashtags to lowercase
650- hashtags_lower = [str (h ).lower ().strip () for h in hashtags_list ]
651- label_without_hash = job_label_normalized .lstrip ('#' )
652- label_with_hash = f"#{ label_without_hash } "
653-
654- # Check if job label is in the hashtags array
655- label_matches = (
656- label_with_hash in hashtags_lower or
657- label_without_hash in hashtags_lower or
658- # Fallback: check main label field
659- entity_label == label_with_hash or
660- entity_label == label_without_hash
661- )
662- except (TypeError , ValueError ):
663- # If conversion fails, fall back to label field check
664- label_without_hash = job_label_normalized .lstrip ('#' )
665- label_with_hash = f"#{ label_without_hash } "
666- label_matches = (
667- entity_label == label_with_hash or
668- entity_label == label_without_hash
669- )
670- else :
671- # Fallback if tweet_hashtags is not iterable or is null
672- label_without_hash = job_label_normalized .lstrip ('#' )
673- label_with_hash = f"#{ label_without_hash } "
674- label_matches = (
675- entity_label == label_with_hash or
676- entity_label == label_without_hash
677- )
642+ tweet_hashtags_raw = row .get ('tweet_hashtags' , None )
643+
644+ hashtags_list = []
645+ if tweet_hashtags_raw is not None :
646+ # If it's an iterable (list / array / Series), but not a string
647+ if hasattr (tweet_hashtags_raw , '__iter__' ) and not isinstance (tweet_hashtags_raw , str ):
648+ try :
649+ hashtags_list = list (tweet_hashtags_raw )
650+ except TypeError :
651+ hashtags_list = []
652+ else :
653+ # Scalar value: drop NaN/NA if possible
654+ try :
655+ if not pd .isna (tweet_hashtags_raw ):
656+ hashtags_list = [tweet_hashtags_raw ]
657+ except Exception :
658+ hashtags_list = [tweet_hashtags_raw ]
659+
660+ # Normalize hashtags to lowercase
661+ hashtags_lower = [
662+ str (h ).lower ().strip ()
663+ for h in hashtags_list
664+ if h is not None and str (h ).strip () != ''
665+ ]
666+
667+ label_without_hash = job_label_normalized .lstrip ('#' )
668+ label_with_hash = f"#{ label_without_hash } "
669+
670+ # Check if job label is in the hashtags array
671+ label_matches = (
672+ label_with_hash in hashtags_lower or
673+ label_without_hash in hashtags_lower or
674+ # Fallback: check main label field
675+ entity_label == label_with_hash or
676+ entity_label == label_without_hash
677+ )
678+
678679 elif platform == 'reddit' :
679680 # For Reddit: check with and without r/ prefix
680681 label_matches = (
@@ -736,7 +737,10 @@ async def _check_job_content_match(
736737 # Record mismatch sample
737738 if len (mismatch_samples ) < 10 :
738739 mismatch_samples .append (
739- f"Job { job_id [:8 ]} : Required label={ job_label or 'any' } keyword={ job_keyword or 'any' } , label_matched={ label_matches } keyword_matched={ keyword_matches } - { uri or 'unknown' } "
740+ f"Job { job_id [:8 ]} : Required label={ job_label or 'any' } "
741+ f"keyword={ job_keyword or 'any' } , "
742+ f"label_matched={ label_matches } keyword_matched={ keyword_matches } "
743+ f"- { uri or 'unknown' } "
740744 )
741745
742746 except Exception as e :
@@ -747,7 +751,8 @@ async def _check_job_content_match(
747751
748752 # Log job match check details (same style as regular validation)
749753 bt .logging .info (
750- f"{ miner_hotkey } : S3 job match validation: Checked { total_checked } entities, { total_matched } matched job requirements ({ match_rate :.1f} %)"
754+ f"{ miner_hotkey } : S3 job match validation: Checked { total_checked } entities, "
755+ f"{ total_matched } matched job requirements ({ match_rate :.1f} %)"
751756 )
752757 if checked_uris :
753758 bt .logging .info (f"{ miner_hotkey } : S3 job match: Sample URIs checked: { checked_uris [:5 ]} " )
@@ -1172,10 +1177,10 @@ def _create_twitter_data_entity(self, row) -> Optional[DataEntity]:
11721177 username = row .get ('username' , '' )
11731178 text = row .get ('text' , '' )
11741179 url = row .get ('url' , row .get ('uri' , '' ))
1175-
1180+
11761181 if not all ([username , text , url ]):
11771182 return None
1178-
1183+
11791184 # Get datetime from row or use current time
11801185 datetime_val = row .get ('datetime' , row .get ('timestamp' , '' ))
11811186 if datetime_val and pd .notna (datetime_val ):
@@ -1187,47 +1192,76 @@ def _create_twitter_data_entity(self, row) -> Optional[DataEntity]:
11871192 timestamp = dt .datetime .now (dt .timezone .utc )
11881193 else :
11891194 timestamp = dt .datetime .now (dt .timezone .utc )
1190-
1195+
1196+ # ---- SAFE NORMALIZATION FOR ARRAY-LIKE FIELDS ----
1197+ # tweet_hashtags can be list/array/Series or scalar/NaN
1198+ raw_hashtags = row .get ('tweet_hashtags' , None )
1199+ if raw_hashtags is None :
1200+ tweet_hashtags = []
1201+ elif hasattr (raw_hashtags , '__iter__' ) and not isinstance (raw_hashtags , str ):
1202+ # Iterable (list, numpy array, Series, etc.)
1203+ try :
1204+ tweet_hashtags = list (raw_hashtags )
1205+ except TypeError :
1206+ tweet_hashtags = []
1207+ else :
1208+ # Scalar: drop NaN/NA if possible
1209+ try :
1210+ tweet_hashtags = [] if pd .isna (raw_hashtags ) else [raw_hashtags ]
1211+ except Exception :
1212+ tweet_hashtags = [raw_hashtags ]
1213+
1214+ # media can also be non-scalar; we only want to clear obvious NaN
1215+ raw_media = row .get ('media' , None )
1216+ if isinstance (raw_media , float ):
1217+ # Handle float("nan") specially
1218+ media_value = None if math .isnan (raw_media ) else raw_media
1219+ else :
1220+ media_value = raw_media
1221+ # ---------------------------------------------------
1222+
11911223 # Create XContent object with ALL uploaded fields
11921224 x_content = XContent (
11931225 username = str (username ),
11941226 text = str (text ),
11951227 url = str (url ),
11961228 timestamp = timestamp ,
1197- tweet_hashtags = row . get ( ' tweet_hashtags' , []) if pd . notna ( row . get ( 'tweet_hashtags' )) else [] ,
1198- media = row . get ( 'media' , None ) if pd . notna ( row . get ( 'media' )) else None ,
1229+ tweet_hashtags = tweet_hashtags ,
1230+ media = media_value ,
11991231 user_id = str (row .get ('user_id' , '' )),
1200- user_display_name = str (row .get ('user_display_name' , username )),
1232+ # user_display_name is OPTIONAL (None when NaN/missing)
1233+ user_display_name = str (row .get ('user_display_name' , '' )) if pd .notna (row .get ('user_display_name' , None )) else None ,
12011234 user_verified = bool (row .get ('user_verified' , False )),
12021235 tweet_id = str (row .get ('tweet_id' , '' )),
12031236 is_reply = bool (row .get ('is_reply' , False )),
12041237 is_quote = bool (row .get ('is_quote' , False )),
12051238 conversation_id = str (row .get ('conversation_id' , '' )),
12061239 in_reply_to_user_id = str (row .get ('in_reply_to_user_id' , '' )),
12071240 # Add missing engagement metrics that miners upload
1208- language = str (row .get ('language' , '' )) if pd .notna (row .get ('language' )) else None ,
1209- in_reply_to_username = str (row .get ('in_reply_to_username' , '' )) if pd .notna (row .get ('in_reply_to_username' )) else None ,
1210- quoted_tweet_id = str (row .get ('quoted_tweet_id' , '' )) if pd .notna (row .get ('quoted_tweet_id' )) else None ,
1211- like_count = int (row .get ('like_count' , 0 )) if pd .notna (row .get ('like_count' )) else None ,
1212- retweet_count = int (row .get ('retweet_count' , 0 )) if pd .notna (row .get ('retweet_count' )) else None ,
1213- reply_count = int (row .get ('reply_count' , 0 )) if pd .notna (row .get ('reply_count' )) else None ,
1214- quote_count = int (row .get ('quote_count' , 0 )) if pd .notna (row .get ('quote_count' )) else None ,
1215- view_count = int (row .get ('view_count' , 0 )) if pd .notna (row .get ('view_count' )) else None ,
1216- bookmark_count = int (row .get ('bookmark_count' , 0 )) if pd .notna (row .get ('bookmark_count' )) else None ,
1241+ language = str (row .get ('language' , '' )) if pd .notna (row .get ('language' , None )) else None ,
1242+ in_reply_to_username = str (row .get ('in_reply_to_username' , '' )) if pd .notna (row .get ('in_reply_to_username' , None )) else None ,
1243+ quoted_tweet_id = str (row .get ('quoted_tweet_id' , '' )) if pd .notna (row .get ('quoted_tweet_id' , None )) else None ,
1244+ like_count = int (row .get ('like_count' , 0 )) if pd .notna (row .get ('like_count' , None )) else None ,
1245+ retweet_count = int (row .get ('retweet_count' , 0 )) if pd .notna (row .get ('retweet_count' , None )) else None ,
1246+ reply_count = int (row .get ('reply_count' , 0 )) if pd .notna (row .get ('reply_count' , None )) else None ,
1247+ quote_count = int (row .get ('quote_count' , 0 )) if pd .notna (row .get ('quote_count' , None )) else None ,
1248+ view_count = int (row .get ('view_count' , 0 )) if pd .notna (row .get ('view_count' , None )) else None ,
1249+ bookmark_count = int (row .get ('bookmark_count' , 0 )) if pd .notna (row .get ('bookmark_count' , None )) else None ,
12171250 # Add missing user profile data that miners upload
1218- user_blue_verified = bool (row .get ('user_blue_verified' , False )) if pd .notna (row .get ('user_blue_verified' )) else None ,
1219- user_description = str (row .get ('user_description' , '' )) if pd .notna (row .get ('user_description' )) else None ,
1220- user_location = str (row .get ('user_location' , '' )) if pd .notna (row .get ('user_location' )) else None ,
1221- profile_image_url = str (row .get ('profile_image_url' , '' )) if pd .notna (row .get ('profile_image_url' )) else None ,
1222- cover_picture_url = str (row .get ('cover_picture_url' , '' )) if pd .notna (row .get ('cover_picture_url' )) else None ,
1223- user_followers_count = int (row .get ('user_followers_count' , 0 )) if pd .notna (row .get ('user_followers_count' )) else None ,
1224- user_following_count = int (row .get ('user_following_count' , 0 )) if pd .notna (row .get ('user_following_count' )) else None
1251+ user_blue_verified = bool (row .get ('user_blue_verified' , False )) if pd .notna (row .get ('user_blue_verified' , None )) else None ,
1252+ user_description = str (row .get ('user_description' , '' )) if pd .notna (row .get ('user_description' , None )) else None ,
1253+ user_location = str (row .get ('user_location' , '' )) if pd .notna (row .get ('user_location' , None )) else None ,
1254+ profile_image_url = str (row .get ('profile_image_url' , '' )) if pd .notna (row .get ('profile_image_url' , None )) else None ,
1255+ cover_picture_url = str (row .get ('cover_picture_url' , '' )) if pd .notna (row .get ('cover_picture_url' , None )) else None ,
1256+ user_followers_count = int (row .get ('user_followers_count' , 0 )) if pd .notna (row .get ('user_followers_count' , None )) else None ,
1257+ user_following_count = int (row .get ('user_following_count' , 0 )) if pd .notna (row .get ('user_following_count' , None )) else None
12251258 )
1226-
1259+
12271260 return XContent .to_data_entity (x_content )
12281261 except Exception :
12291262 return None
1230-
1263+
1264+
12311265 def _create_reddit_data_entity (self , row ) -> Optional [DataEntity ]:
12321266 """Create Reddit DataEntity from parquet row"""
12331267 try :
0 commit comments