Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 96 additions & 62 deletions vali_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,42 +544,43 @@ async def _check_job_content_match(
# Handle different label formats
if platform in ['x', 'twitter']:
# For X: check if label is in tweet_hashtags array (handles multiple hashtags)
tweet_hashtags = row.get('tweet_hashtags', [])

# Handle pandas types - tweet_hashtags might be a list, numpy array, or Series
if pd.notna(tweet_hashtags) and hasattr(tweet_hashtags, '__iter__') and not isinstance(tweet_hashtags, str):
# Convert to list if it's array-like (handles pandas Series, numpy arrays, lists)
try:
hashtags_list = list(tweet_hashtags) if not isinstance(tweet_hashtags, list) else tweet_hashtags
# Normalize hashtags to lowercase
hashtags_lower = [str(h).lower().strip() for h in hashtags_list]
label_without_hash = job_label_normalized.lstrip('#')
label_with_hash = f"#{label_without_hash}"

# Check if job label is in the hashtags array
label_matches = (
label_with_hash in hashtags_lower or
label_without_hash in hashtags_lower or
# Fallback: check main label field
entity_label == label_with_hash or
entity_label == label_without_hash
)
except (TypeError, ValueError):
# If conversion fails, fall back to label field check
label_without_hash = job_label_normalized.lstrip('#')
label_with_hash = f"#{label_without_hash}"
label_matches = (
entity_label == label_with_hash or
entity_label == label_without_hash
)
else:
# Fallback if tweet_hashtags is not iterable or is null
label_without_hash = job_label_normalized.lstrip('#')
label_with_hash = f"#{label_without_hash}"
label_matches = (
entity_label == label_with_hash or
entity_label == label_without_hash
)
tweet_hashtags_raw = row.get('tweet_hashtags', None)

hashtags_list = []
if tweet_hashtags_raw is not None:
# If it's an iterable (list / array / Series), but not a string
if hasattr(tweet_hashtags_raw, '__iter__') and not isinstance(tweet_hashtags_raw, str):
try:
hashtags_list = list(tweet_hashtags_raw)
except TypeError:
hashtags_list = []
else:
# Scalar value: drop NaN/NA if possible
try:
if not pd.isna(tweet_hashtags_raw):
hashtags_list = [tweet_hashtags_raw]
except Exception:
hashtags_list = [tweet_hashtags_raw]

# Normalize hashtags to lowercase
hashtags_lower = [
str(h).lower().strip()
for h in hashtags_list
if h is not None and str(h).strip() != ''
]

label_without_hash = job_label_normalized.lstrip('#')
label_with_hash = f"#{label_without_hash}"

# Check if job label is in the hashtags array
label_matches = (
label_with_hash in hashtags_lower or
label_without_hash in hashtags_lower or
# Fallback: check main label field
entity_label == label_with_hash or
entity_label == label_without_hash
)

elif platform == 'reddit':
# For Reddit: check with and without r/ prefix
label_matches = (
Expand Down Expand Up @@ -641,7 +642,10 @@ async def _check_job_content_match(
# Record mismatch sample
if len(mismatch_samples) < 10:
mismatch_samples.append(
f"Job {job_id[:8]}: Required label={job_label or 'any'} keyword={job_keyword or 'any'}, label_matched={label_matches} keyword_matched={keyword_matches} - {uri or 'unknown'}"
f"Job {job_id[:8]}: Required label={job_label or 'any'} "
f"keyword={job_keyword or 'any'}, "
f"label_matched={label_matches} keyword_matched={keyword_matches} "
f"- {uri or 'unknown'}"
)

except Exception as e:
Expand All @@ -652,7 +656,8 @@ async def _check_job_content_match(

# Log job match check details (same style as regular validation)
bt.logging.info(
f"{miner_hotkey}: S3 job match validation: Checked {total_checked} entities, {total_matched} matched job requirements ({match_rate:.1f}%)"
f"{miner_hotkey}: S3 job match validation: Checked {total_checked} entities, "
f"{total_matched} matched job requirements ({match_rate:.1f}%)"
)
if checked_uris:
bt.logging.info(f"{miner_hotkey}: S3 job match: Sample URIs checked: {checked_uris[:5]}")
Expand Down Expand Up @@ -1077,10 +1082,10 @@ def _create_twitter_data_entity(self, row) -> Optional[DataEntity]:
username = row.get('username', '')
text = row.get('text', '')
url = row.get('url', row.get('uri', ''))

if not all([username, text, url]):
return None

# Get datetime from row or use current time
datetime_val = row.get('datetime', row.get('timestamp', ''))
if datetime_val and pd.notna(datetime_val):
Expand All @@ -1092,47 +1097,76 @@ def _create_twitter_data_entity(self, row) -> Optional[DataEntity]:
timestamp = dt.datetime.now(dt.timezone.utc)
else:
timestamp = dt.datetime.now(dt.timezone.utc)


# ---- SAFE NORMALIZATION FOR ARRAY-LIKE FIELDS ----
# tweet_hashtags can be list/array/Series or scalar/NaN
raw_hashtags = row.get('tweet_hashtags', None)
if raw_hashtags is None:
tweet_hashtags = []
elif hasattr(raw_hashtags, '__iter__') and not isinstance(raw_hashtags, str):
# Iterable (list, numpy array, Series, etc.)
try:
tweet_hashtags = list(raw_hashtags)
except TypeError:
tweet_hashtags = []
else:
# Scalar: drop NaN/NA if possible
try:
tweet_hashtags = [] if pd.isna(raw_hashtags) else [raw_hashtags]
except Exception:
tweet_hashtags = [raw_hashtags]

# media can also be non-scalar; we only want to clear obvious NaN
raw_media = row.get('media', None)
if isinstance(raw_media, float):
# Handle float("nan") specially
media_value = None if math.isnan(raw_media) else raw_media
else:
media_value = raw_media
# ---------------------------------------------------

# Create XContent object with ALL uploaded fields
x_content = XContent(
username=str(username),
text=str(text),
url=str(url),
timestamp=timestamp,
tweet_hashtags=row.get('tweet_hashtags', []) if pd.notna(row.get('tweet_hashtags')) else [],
media=row.get('media', None) if pd.notna(row.get('media')) else None,
tweet_hashtags=tweet_hashtags,
media=media_value,
user_id=str(row.get('user_id', '')),
user_display_name=str(row.get('user_display_name', username)),
# user_display_name is OPTIONAL (None when NaN/missing)
user_display_name=str(row.get('user_display_name', '')) if pd.notna(row.get('user_display_name', None)) else None,
user_verified=bool(row.get('user_verified', False)),
tweet_id=str(row.get('tweet_id', '')),
is_reply=bool(row.get('is_reply', False)),
is_quote=bool(row.get('is_quote', False)),
conversation_id=str(row.get('conversation_id', '')),
in_reply_to_user_id=str(row.get('in_reply_to_user_id', '')),
# Add missing engagement metrics that miners upload
language=str(row.get('language', '')) if pd.notna(row.get('language')) else None,
in_reply_to_username=str(row.get('in_reply_to_username', '')) if pd.notna(row.get('in_reply_to_username')) else None,
quoted_tweet_id=str(row.get('quoted_tweet_id', '')) if pd.notna(row.get('quoted_tweet_id')) else None,
like_count=int(row.get('like_count', 0)) if pd.notna(row.get('like_count')) else None,
retweet_count=int(row.get('retweet_count', 0)) if pd.notna(row.get('retweet_count')) else None,
reply_count=int(row.get('reply_count', 0)) if pd.notna(row.get('reply_count')) else None,
quote_count=int(row.get('quote_count', 0)) if pd.notna(row.get('quote_count')) else None,
view_count=int(row.get('view_count', 0)) if pd.notna(row.get('view_count')) else None,
bookmark_count=int(row.get('bookmark_count', 0)) if pd.notna(row.get('bookmark_count')) else None,
language=str(row.get('language', '')) if pd.notna(row.get('language', None)) else None,
in_reply_to_username=str(row.get('in_reply_to_username', '')) if pd.notna(row.get('in_reply_to_username', None)) else None,
quoted_tweet_id=str(row.get('quoted_tweet_id', '')) if pd.notna(row.get('quoted_tweet_id', None)) else None,
like_count=int(row.get('like_count', 0)) if pd.notna(row.get('like_count', None)) else None,
retweet_count=int(row.get('retweet_count', 0)) if pd.notna(row.get('retweet_count', None)) else None,
reply_count=int(row.get('reply_count', 0)) if pd.notna(row.get('reply_count', None)) else None,
quote_count=int(row.get('quote_count', 0)) if pd.notna(row.get('quote_count', None)) else None,
view_count=int(row.get('view_count', 0)) if pd.notna(row.get('view_count', None)) else None,
bookmark_count=int(row.get('bookmark_count', 0)) if pd.notna(row.get('bookmark_count', None)) else None,
# Add missing user profile data that miners upload
user_blue_verified=bool(row.get('user_blue_verified', False)) if pd.notna(row.get('user_blue_verified')) else None,
user_description=str(row.get('user_description', '')) if pd.notna(row.get('user_description')) else None,
user_location=str(row.get('user_location', '')) if pd.notna(row.get('user_location')) else None,
profile_image_url=str(row.get('profile_image_url', '')) if pd.notna(row.get('profile_image_url')) else None,
cover_picture_url=str(row.get('cover_picture_url', '')) if pd.notna(row.get('cover_picture_url')) else None,
user_followers_count=int(row.get('user_followers_count', 0)) if pd.notna(row.get('user_followers_count')) else None,
user_following_count=int(row.get('user_following_count', 0)) if pd.notna(row.get('user_following_count')) else None
user_blue_verified=bool(row.get('user_blue_verified', False)) if pd.notna(row.get('user_blue_verified', None)) else None,
user_description=str(row.get('user_description', '')) if pd.notna(row.get('user_description', None)) else None,
user_location=str(row.get('user_location', '')) if pd.notna(row.get('user_location', None)) else None,
profile_image_url=str(row.get('profile_image_url', '')) if pd.notna(row.get('profile_image_url', None)) else None,
cover_picture_url=str(row.get('cover_picture_url', '')) if pd.notna(row.get('cover_picture_url', None)) else None,
user_followers_count=int(row.get('user_followers_count', 0)) if pd.notna(row.get('user_followers_count', None)) else None,
user_following_count=int(row.get('user_following_count', 0)) if pd.notna(row.get('user_following_count', None)) else None
)

return XContent.to_data_entity(x_content)
except Exception:
return None



def _create_reddit_data_entity(self, row) -> Optional[DataEntity]:
"""Create Reddit DataEntity from parquet row"""
try:
Expand Down