-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgoogle_ai_studio_llm.py
More file actions
1182 lines (982 loc) · 54.1 KB
/
google_ai_studio_llm.py
File metadata and controls
1182 lines (982 loc) · 54.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from langchain.llms.base import LLM
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
import time
import re
from typing import Optional, List, Any
from pydantic import Field
class GoogleAIStudioLLM(LLM):
"""LangChain wrapper for Google AI Studio web interface"""
timeout: int = Field(default=30, description="Timeout for web operations")
headless: bool = Field(default=True, description="Run browser in headless mode")
driver: Optional[Any] = Field(default=None, exclude=True, description="Selenium WebDriver instance")
shutdown_requested: bool = Field(default=False, exclude=True, description="Flag to indicate shutdown requested")
def __init__(self, headless: bool = True, timeout: int = 30, **kwargs):
super().__init__(timeout=timeout, headless=headless, **kwargs)
self._setup_driver()
def _setup_driver(self):
"""Initialize Chrome driver with appropriate options"""
chrome_options = Options()
# Try to connect to existing Chrome with remote debugging
try:
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
print("🔗 Attempting to connect to existing Chrome instance...")
self.driver = webdriver.Chrome(options=chrome_options)
print("✅ Connected to existing Chrome!")
# Test if the connection is actually working
try:
current_url = self.driver.current_url
print(f"📄 Current page: {current_url}")
except Exception as e:
print(f"⚠️ Chrome connection is stale: {e}")
raise Exception("Chrome connection failed")
except Exception as e:
print(f"⚠️ Could not connect to existing Chrome: {e}")
print("🚀 Starting new Chrome instance...")
# Fallback to starting new Chrome
chrome_options = Options() # Reset options
if self.headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1218,953")
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.set_page_load_timeout(self.timeout)
# Set longer timeouts for element operations
self.driver.implicitly_wait(10) # Wait up to 10 seconds for elements
# Navigate to AI Studio
print("🌐 Navigating to Google AI Studio...")
self.driver.get("https://aistudio.google.com/prompts/new_chat")
time.sleep(3) # Wait for page load
print("📄 Page loaded!")
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
"""Send prompt and get response from Google AI Studio"""
print("\n" + "="*60)
print("🚀 STARTING LLM CALL")
print("="*60)
print(f"📝 Prompt: {prompt[:200]}...")
start_time = time.time()
try:
# STEP 0: Capture current page state BEFORE sending message
print("\n📸 STEP 0: Capturing current page state...")
old_response = self._capture_current_response()
print(f"📋 Previous response captured: {old_response[:100] if old_response else 'None'}...")
# Use the prompt as-is without artificial tags
formatted_prompt = prompt
print("📝 Using raw prompt without artificial tags")
print(f"🏷️ Formatted prompt length: {len(formatted_prompt)} chars")
# Find and fill the input area
print("\n📤 STEP 1: Sending message...")
send_start = time.time()
self._send_message(formatted_prompt)
send_time = time.time() - send_start
print(f"✅ Message sent in {send_time:.1f} seconds")
# Wait for and extract NEW response
print("\n📥 STEP 2: Waiting for NEW response...")
response_start = time.time()
response = self._get_new_response(old_response)
response_time = time.time() - response_start
print(f"✅ NEW response received in {response_time:.1f} seconds")
# Use the response as-is since it was extracted from actual HTML elements
print("\n🔍 STEP 3: Processing response...")
final_response = response.strip()
print(f"✅ Using AI response extracted from HTML: {final_response[:100]}...")
total_time = time.time() - start_time
print(f"\n🎉 CALL COMPLETED in {total_time:.1f} seconds")
print("="*60)
return final_response
except Exception as e:
total_time = time.time() - start_time
error_msg = f"Error: {str(e)}"
print(f"\n❌ CALL FAILED after {total_time:.1f} seconds")
print(f"Error: {error_msg}")
print("="*60)
return error_msg
def _activate_textarea(self):
"""Activate the textarea by clicking on the input area (optimized)"""
try:
print("🎯 Quick textarea activation...")
# Try the most likely selectors first (based on our debug)
quick_selectors = [
".input-placeholder", # This worked in our test
".placeholder-overlay",
".prompt-input-wrapper"
]
for selector in quick_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements and elements[0].is_displayed():
print(f" 🎯 Clicking {selector}...")
elements[0].click()
time.sleep(1) # Reduced wait time
# Quick check if textarea is visible
textareas = self.driver.find_elements(By.TAG_NAME, "textarea")
if textareas and textareas[0].is_displayed():
print(f" ✅ Activated by {selector}")
return True
except Exception as e:
continue
# If quick methods fail, textarea might already be active
textareas = self.driver.find_elements(By.TAG_NAME, "textarea")
if textareas and textareas[0].is_displayed():
print(" ✅ Textarea already active")
return True
print(" ⚠️ Could not activate textarea")
return False
except Exception as e:
print(f"❌ Error activating textarea: {e}")
return False
def _send_message(self, message: str):
"""Send message to the chat interface"""
print(f"📝 Sending message: {message[:100]}...")
print(f"📏 Message length: {len(message)} characters")
# Check message length and truncate if too long
MAX_MESSAGE_LENGTH = 8000 # 8KB limit for stability
if len(message) > MAX_MESSAGE_LENGTH:
print(f"⚠️ Message too long ({len(message)} chars), truncating to {MAX_MESSAGE_LENGTH}")
message = message[:MAX_MESSAGE_LENGTH] + "\n\n[Message truncated due to length limit]"
# Use shorter waits with explicit checks
short_wait = WebDriverWait(self.driver, 5) # Shorter timeout for faster debugging
try:
# First, try to activate the textarea
print("🎯 Activating textarea...")
self._activate_textarea()
# Look for the text input area (use Puppeteer-confirmed selectors)
input_selectors = [
"textarea", # Simple selector that works
"ms-autosize-textarea textarea", # From Puppeteer path
"ms-text-chunk textarea", # More specific from Puppeteer
"ms-chunk-input textarea", # Even more specific
"[aria-label*='Type something']", # From Puppeteer aria selector
"textarea[placeholder*='Enter a prompt']",
"[contenteditable='true']"
]
print("🔍 Looking for input element...")
input_element = None
# Quick element search (optimized)
for i, selector in enumerate(input_selectors):
try:
print(f" Trying selector {i+1}/{len(input_selectors)}: {selector}")
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
element = elements[0]
if element.is_displayed() and element.is_enabled():
input_element = element
print(f"✅ Found input element with selector: {selector}")
break
except Exception as e:
print(f" ❌ Selector failed: {str(e)[:30]}...")
continue
if not input_element:
print("❌ Could not find any input element")
raise Exception("Could not find input element")
# Clear and enter the message with chunked typing for large messages
print("🧹 Clearing input field...")
input_element.clear()
time.sleep(1) # Wait after clearing
print("📋 Pasting message...")
self._paste_message(input_element, message)
print("✅ Message entered successfully")
# Send message using Ctrl+Enter (most reliable)
print("⌨️ Sending message with Ctrl+Enter...")
try:
input_element.send_keys(Keys.CONTROL + Keys.ENTER)
print("✅ Sent Ctrl+Enter!")
time.sleep(3) # Wait for message to be sent
# Quick verification
if self._verify_message_sent():
print("✅ Message sent successfully!")
else:
print("⚠️ Ctrl+Enter might not have worked, trying button click...")
self._click_run_button()
except Exception as e:
print(f"❌ Ctrl+Enter failed: {e}")
print("🔄 Falling back to button click...")
self._click_run_button()
except Exception as e:
print(f"❌ Failed to send message: {str(e)}")
raise Exception(f"Failed to send message: {str(e)}")
def _paste_message(self, input_element, message: str):
"""Paste message using clipboard - much faster than typing"""
import pyperclip
try:
print(f"📋 Using clipboard paste for {len(message)} characters...")
# Save current clipboard content
original_clipboard = ""
try:
original_clipboard = pyperclip.paste()
except:
pass # Ignore if clipboard is empty
# Copy message to clipboard
pyperclip.copy(message)
print("📋 Message copied to clipboard")
# Focus the input element and paste
input_element.click()
time.sleep(0.5) # Wait for focus
# Use Ctrl+V to paste
input_element.send_keys(Keys.CONTROL + 'v')
print("📋 Message pasted successfully")
# Wait a moment for paste to complete
if not self.shutdown_requested:
time.sleep(1)
# Restore original clipboard content
try:
if original_clipboard:
pyperclip.copy(original_clipboard)
except:
pass # Ignore if restore fails
except ImportError:
print("⚠️ pyperclip not available, falling back to typing...")
self._type_message_fallback(input_element, message)
except Exception as e:
print(f"⚠️ Clipboard paste failed: {e}, falling back to typing...")
self._type_message_fallback(input_element, message)
def _type_message_fallback(self, input_element, message: str):
"""Fallback typing method if clipboard paste fails"""
CHUNK_SIZE = 2000 # Larger chunks since we're fallback
if len(message) <= CHUNK_SIZE:
# Small message, type normally
input_element.send_keys(message)
return
# Large message, type in chunks
print(f"⌨️ Fallback: typing large message in chunks of {CHUNK_SIZE} characters...")
chunks = [message[i:i+CHUNK_SIZE] for i in range(0, len(message), CHUNK_SIZE)]
for i, chunk in enumerate(chunks):
print(f" ⌨️ Typing chunk {i+1}/{len(chunks)} ({len(chunk)} chars)")
input_element.send_keys(chunk)
# Small delay between chunks to prevent browser freeze
if i < len(chunks) - 1 and not self.shutdown_requested: # Don't wait after last chunk
time.sleep(1)
def _click_run_button_puppeteer_style(self):
"""Click run button using Puppeteer-style selectors"""
print("🎭 Trying Puppeteer-style button click...")
# Try the exact Puppeteer selector first
puppeteer_selectors = [
"run-button button",
"span.command > span",
"span.command span"
]
for selector in puppeteer_selectors:
try:
print(f" 🎯 Trying Puppeteer selector: {selector}")
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
element = elements[0] # Take first match
if element.is_displayed() and element.is_enabled():
element.click()
print(f"✅ Clicked with Puppeteer selector: {selector}")
time.sleep(2)
return True
except Exception as e:
print(f" ❌ Puppeteer selector failed: {e}")
continue
return False
def _click_run_button(self):
"""Click the run button to send the message"""
print("🔍 Looking for run button...")
# First, try to find any button that might be the send/run button
button_selectors = [
# Puppeteer-confirmed selectors (put first)
"run-button button", # From Puppeteer: run-button/button
"span.command", # From Puppeteer: span.command > span
"button[aria-label='Run']", # Exact match that works
"button[type='submit']", # This also works
# Fallback selectors
"button[aria-label*='Send']",
"button[aria-label*='Run']",
"button[title*='Send']",
"button[title*='Run']",
"button.send-button",
"button.run-button",
# Look for buttons with icons
"button:has(mat-icon)",
"button:has(svg)",
"button mat-icon",
# Broader selectors
"button[data-testid*='send']",
"button[data-testid*='run']",
"button[data-testid*='submit']",
# Very broad - any clickable button near textarea
"form button",
"div button"
]
# Try each selector with a short wait
for i, selector in enumerate(button_selectors):
try:
print(f" 🔍 Trying selector {i+1}/{len(button_selectors)}: {selector}")
# Use a very short wait for each attempt
buttons = self.driver.find_elements(By.CSS_SELECTOR, selector)
if buttons:
print(f" 📍 Found {len(buttons)} button(s) with this selector")
# Try each button found
for j, button in enumerate(buttons):
try:
# Check if button is visible and enabled
if button.is_displayed() and button.is_enabled():
button_text = button.text.strip()
aria_label = button.get_attribute('aria-label') or ''
title = button.get_attribute('title') or ''
print(f" 🎯 Button {j+1}: text='{button_text}', aria-label='{aria_label}', title='{title}'")
# Click the button
button.click()
print(f"✅ Successfully clicked button {j+1}!")
# Wait a moment to see if it worked
time.sleep(2)
return
except Exception as e:
print(f" ❌ Failed to click button {j+1}: {str(e)[:30]}...")
continue
except Exception as e:
print(f" ❌ Selector failed: {str(e)[:30]}...")
continue
# Fallback methods if no button found
print("⌨️ No button found, trying keyboard shortcuts...")
# Method 1: Enter key
try:
print(" 🔑 Trying Enter key...")
active_element = self.driver.switch_to.active_element
active_element.send_keys(Keys.ENTER)
print("✅ Enter key sent!")
time.sleep(2)
return
except Exception as e:
print(f" ❌ Enter key failed: {e}")
# Method 2: Ctrl+Enter
try:
print(" 🔑 Trying Ctrl+Enter...")
active_element = self.driver.switch_to.active_element
active_element.send_keys(Keys.CONTROL + Keys.ENTER)
print("✅ Ctrl+Enter sent!")
time.sleep(2)
return
except Exception as e:
print(f" ❌ Ctrl+Enter failed: {e}")
# Method 3: Tab to button and press Enter
try:
print(" 🔑 Trying Tab navigation...")
active_element = self.driver.switch_to.active_element
# Tab a few times to find the send button
for i in range(5):
active_element.send_keys(Keys.TAB)
time.sleep(0.5)
current_element = self.driver.switch_to.active_element
tag_name = current_element.tag_name.lower()
if tag_name == 'button':
print(f" 🎯 Found button after {i+1} tabs")
current_element.send_keys(Keys.ENTER)
print("✅ Button activated with Enter!")
time.sleep(2)
return
except Exception as e:
print(f" ❌ Tab navigation failed: {e}")
# Try Puppeteer-style clicking as final attempt
print("🎭 Trying Puppeteer-style button detection...")
if self._click_run_button_puppeteer_style():
return
# Final attempt: Use Ctrl+Enter (most reliable for Google AI Studio)
print("⌨️ Final attempt: Using Ctrl+Enter to send message...")
try:
# Focus on the textarea first
textareas = self.driver.find_elements(By.TAG_NAME, "textarea")
if textareas:
textarea = textareas[0]
textarea.click() # Focus
time.sleep(0.5)
# Send Ctrl+Enter
textarea.send_keys(Keys.CONTROL + Keys.ENTER)
print("✅ Sent Ctrl+Enter!")
time.sleep(2)
return
except Exception as e:
print(f"❌ Ctrl+Enter failed: {e}")
print("❌ Could not find or activate any send button")
print("💡 The message may have been entered but not sent")
raise Exception("Could not find or click run button")
def _verify_message_sent(self) -> bool:
"""Verify that the message was actually sent"""
try:
# Wait a moment for the UI to update
time.sleep(3)
# Look for indicators that the message was sent
# 1. Input field should be cleared or disabled
input_elements = self.driver.find_elements(By.CSS_SELECTOR, "textarea")
for input_elem in input_elements:
if input_elem.is_displayed():
current_text = input_elem.get_attribute('value') or input_elem.text
if len(current_text.strip()) == 0:
print(" ✅ Input field is cleared - message likely sent")
return True
# 2. Look for loading indicators or "thinking" messages
loading_indicators = [
"[data-testid*='loading']",
".loading",
".spinner",
"div:contains('thinking')",
"div:contains('generating')"
]
for selector in loading_indicators:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements and any(elem.is_displayed() for elem in elements):
print(" ✅ Loading indicator found - message sent")
return True
except:
continue
# 3. Check if page content changed (new response appearing)
page_text = self.driver.find_element(By.TAG_NAME, "body").text
if any(indicator in page_text.lower() for indicator in
['generating', 'thinking', 'processing', 'please wait']):
print(" ✅ AI is processing - message sent")
return True
print(" ⚠️ No clear indication message was sent")
return False
except Exception as e:
print(f" ❌ Error verifying message sent: {e}")
return False
def _alternative_send_method(self):
"""Alternative method to send the message"""
try:
print(" 🔄 Trying alternative send method...")
# Method 1: Look for any button and click it
all_buttons = self.driver.find_elements(By.TAG_NAME, "button")
print(f" 📊 Found {len(all_buttons)} total buttons on page")
for i, button in enumerate(all_buttons):
try:
if button.is_displayed() and button.is_enabled():
button_text = (button.text or '').strip().lower()
aria_label = (button.get_attribute('aria-label') or '').lower()
# Look for send-like buttons
if any(keyword in button_text + ' ' + aria_label for keyword in
['send', 'run', 'submit', 'go', 'execute']):
print(f" 🎯 Trying button {i}: '{button_text}' / '{aria_label}'")
button.click()
time.sleep(2)
return
except Exception as e:
continue
# Method 2: Try clicking near the textarea
print(" 🎯 Trying to click near textarea...")
textareas = self.driver.find_elements(By.CSS_SELECTOR, "textarea")
if textareas:
textarea = textareas[0]
# Use JavaScript to find and click nearby buttons
self.driver.execute_script("""
var textarea = arguments[0];
var parent = textarea.parentElement;
var buttons = parent.querySelectorAll('button');
for (var i = 0; i < buttons.length; i++) {
if (buttons[i].offsetParent !== null) { // visible
buttons[i].click();
break;
}
}
""", textarea)
print(" ✅ Clicked nearby button with JavaScript")
time.sleep(2)
return
print(" ❌ Alternative send method failed")
except Exception as e:
print(f" ❌ Alternative send method error: {e}")
def _capture_current_response(self) -> str:
"""Capture the current AI response from HTML elements before sending new message"""
try:
# Try to get current AI response from DOM
current_response = self._extract_ai_response_from_dom()
if current_response:
print(f"📋 Found previous AI response: '{current_response[:50]}...'")
return current_response
# Fallback to text extraction
body_text = self.driver.find_element(By.TAG_NAME, "body").text
text_response = self._extract_latest_ai_response_from_text(body_text, "")
if text_response:
print(f"📋 Found previous text response: '{text_response[:50]}...'")
return text_response
print("📋 No previous AI responses found")
return ""
except Exception as e:
print(f"⚠️ Could not capture current response: {e}")
return ""
def _get_new_response(self, old_response: str) -> str:
"""Wait for a NEW AI response by detecting actual HTML response elements"""
print(f"⏳ Waiting for NEW AI response from HTML elements...")
print(f"🔍 Previous response to compare: '{old_response[:50]}...'")
max_attempts = 30 # 30 attempts = up to 5 minutes (10 seconds each)
attempt = 0
last_seen_content = ""
stable_count = 0
while attempt < max_attempts:
attempt += 1
elapsed_time = attempt * 10
print(f"🔍 Attempt {attempt}/{max_attempts} (elapsed: {elapsed_time}s)")
try:
# Check if shutdown was requested
if self.shutdown_requested:
print("🛑 Shutdown requested, stopping response wait...")
return "Operation cancelled due to shutdown"
# Check if driver is still available
if not self.driver:
print("❌ Driver is None, cannot continue")
return "Error: Browser connection lost"
# Method 1: Look for actual AI response elements in the DOM
ai_response = self._extract_ai_response_from_dom()
if ai_response and ai_response != old_response and len(ai_response) > 5: # Lower threshold for short responses
# Check if content has been stable (indicating completion)
if ai_response == last_seen_content:
stable_count += 1
print(f" 📊 AI response stable for {stable_count} attempts")
# If stable for 3 attempts (30 seconds), consider it complete
if stable_count >= 3:
print(f"✅ Found STABLE AI response from DOM!")
print(f"📝 Response: {ai_response[:200]}...")
return ai_response
else:
last_seen_content = ai_response
stable_count = 0
print(f" 🔄 AI response still changing: {len(ai_response)} chars")
print(f" 📝 Current: {ai_response[:100]}...")
# Method 2: Fallback to text-based extraction
if not ai_response and self.driver:
body_text = self.driver.find_element(By.TAG_NAME, "body").text
current_response = self._extract_latest_ai_response_from_text(body_text, old_response)
if current_response and current_response != old_response:
if current_response == last_seen_content:
stable_count += 1
if stable_count >= 3:
print(f"✅ Found STABLE text-based response!")
print(f"📝 Response: {current_response[:200]}...")
return current_response
else:
last_seen_content = current_response
stable_count = 0
print(f" 🔄 Text response changing: {current_response[:100]}...")
# Show debugging info every minute
if attempt % 6 == 0:
print(f" 🔍 Checking for response elements...")
self._debug_page_structure()
# Wait before next attempt, but check for shutdown
for i in range(10): # Sleep in 1-second intervals
if self.shutdown_requested:
print("🛑 Shutdown requested, stopping response wait...")
return "Operation cancelled due to shutdown"
time.sleep(1)
except Exception as e:
print(f" ❌ Error in attempt {attempt}: {e}")
time.sleep(10)
# Final fallback
print("⚠️ Maximum attempts reached, using final extraction...")
try:
# Try DOM extraction one more time
final_response = self._extract_ai_response_from_dom()
if final_response and final_response != old_response:
print(f"⚠️ Using final DOM response: {final_response[:200]}...")
return final_response
# Last resort: text extraction
body_text = self.driver.find_element(By.TAG_NAME, "body").text
fallback_response = self._extract_latest_ai_response_from_text(body_text, old_response)
if fallback_response and fallback_response != old_response:
print(f"⚠️ Using final text response: {fallback_response[:200]}...")
return fallback_response
return "Error: No new AI response found after maximum wait time"
except:
return "Error: Could not extract any response from page"
def _extract_ai_response_from_dom(self) -> str:
"""Extract AI response directly from DOM elements"""
try:
# Check if driver is available
if not self.driver or self.shutdown_requested:
return ""
# We'll exclude Google Search Suggestions elements later, after response appears
excluded_elements = set()
# Look for the specific response elements based on the HTML structure
# The response is in: <span class="ng-star-inserted">Hello! How can I help you today?</span>
# Try specific selectors for the AI response
ai_response_selectors = [
"ms-text-chunk span.ng-star-inserted", # Most specific
".model-prompt-container span.ng-star-inserted", # Model container
"ms-cmark-node span.ng-star-inserted", # Content node
"span.ng-star-inserted", # Broader span selector
]
for selector in ai_response_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
# Check each element for AI response content
for element in elements:
text = element.text.strip()
if text and len(text) > 10:
# Skip user messages and UI elements
if not any(skip in text.lower() for skip in
['user:', 'assistant:', 'edit', 'rerun', 'more options', 'thoughts']):
# Check if it looks like an AI response
if any(pattern in text.lower() for pattern in
['hello', 'help', 'can i', 'how may', 'what can', 'i am', 'sure']):
print(f" 🎯 Found AI response with {selector}: '{text}'")
return text
except Exception as e:
continue
# Try to find the model response container specifically
try:
model_containers = self.driver.find_elements(By.CSS_SELECTOR, ".model-prompt-container")
if model_containers:
# Get the last (most recent) model response
last_container = model_containers[-1]
container_text = last_container.text.strip()
# Extract just the response part (skip UI elements)
lines = container_text.split('\n')
for line in lines:
line = line.strip()
if line and len(line) > 10:
# Skip UI elements
if not any(ui in line.lower() for ui in
['edit', 'rerun', 'more options', 'good response', 'bad response']):
# This looks like the actual response
print(f" 🎯 Found model response: '{line}'")
return line
except Exception as e:
print(f" ⚠️ Error checking model containers: {e}")
# Fallback: broader selectors
response_selectors = [
"span.ng-star-inserted", # Based on HTML structure
"p", # Paragraph elements
"div", # Div elements
]
for selector in response_selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
# Get the last (most recent) element
last_element = elements[-1]
response_text = last_element.text.strip()
if response_text and len(response_text) > 5: # Lower threshold
# Check if this element is a Google Search Suggestion
if self._is_google_suggestion_element(last_element):
print(f" 🚫 Skipping Google suggestion element: {response_text[:50]}...")
continue
# Look for AI response patterns (more comprehensive)
ai_response_patterns = [
'hello', 'hi', 'how can i help', 'i can help', 'sure', 'yes',
'here is', 'here are', 'this is', 'that is', 'i am', 'i will',
'let me', 'of course', 'certainly', 'absolutely', 'test', 'testing',
'the answer', 'to answer', 'response', 'reply', 'understand',
'question', 'ask', 'help', 'assist', 'explain', 'provide'
]
response_lower = response_text.lower()
# Check if it contains AI response patterns
has_ai_pattern = any(pattern in response_lower for pattern in ai_response_patterns)
# Skip obvious UI elements
skip_patterns = [
'run', 'clear', 'export', 'share', 'enter a prompt', 'sign in',
'home', 'settings', 'menu', 'button', 'click', 'expand', 'collapse',
'run prompt', 'advanced settings', 'google ai models may make mistakes'
]
has_ui_pattern = any(skip_word in response_lower for skip_word in skip_patterns)
# More aggressive detection - if it's substantial and not UI, it's probably AI response
is_substantial = len(response_text.strip()) > 15
is_not_ui = not has_ui_pattern
is_not_old_response = response_text != old_response if 'old_response' in locals() else True
if (has_ai_pattern or (is_substantial and is_not_ui)) and is_not_old_response:
print(f" 🎯 Found potential AI response with selector: {selector}")
print(f" Content: '{response_text[:100]}...'")
print(f" Length: {len(response_text)} chars")
return response_text
else:
print(f" 🚫 Skipping non-AI element: {response_text[:50]}...")
except:
continue
# Try to find response by looking for conversation structure
return self._extract_from_conversation_structure()
except Exception as e:
print(f" ⚠️ DOM extraction error: {e}")
return ""
def _extract_from_conversation_structure(self) -> str:
"""Extract response by analyzing conversation structure"""
try:
# Check if driver is available
if not self.driver or self.shutdown_requested:
return ""
# Look for elements that might contain the conversation
conversation_selectors = [
"main",
"[role='main']",
".conversation",
".chat-container",
".messages"
]
for selector in conversation_selectors:
try:
container = self.driver.find_element(By.CSS_SELECTOR, selector)
# Get all text content and try to identify the AI response
full_text = container.text
# Split into potential message blocks
lines = full_text.split('\n')
# Look for the last substantial block of text that looks like an AI response
for i in range(len(lines) - 1, -1, -1):
line = lines[i].strip()
if len(line) > 50: # Substantial content
# Check if it looks like an AI response (not user input, UI, or suggestions)
skip_patterns = [
'enter a prompt', 'run', 'clear', 'user:', 'you:',
'google search suggestions', 'display of search suggestions',
'learn more', 'google logo', 'www.google.com/search',
'grounding with google search', 'safesearch=active',
'client=app-vertex-grounding'
]
if not any(skip in line.lower() for skip in skip_patterns):
# This might be the AI response
return line
except:
continue
except Exception as e:
print(f" ⚠️ Conversation structure extraction error: {e}")
return ""
def _extract_latest_ai_response_from_text(self, body_text: str, old_response: str) -> str:
"""Extract the latest AI response from page text"""
lines = body_text.split('\n')
# Look for conversation patterns
potential_responses = []
for i, line in enumerate(lines):
line = line.strip()
if len(line) < 20: # Skip short lines
continue
# Skip obvious UI elements
ui_skip_patterns = [
'run', 'clear', 'export', 'share', 'menu', 'button', 'sign in',
'enter a prompt', 'google ai studio', 'home', 'settings'
]
if any(ui_word in line.lower() for ui_word in ui_skip_patterns):
continue
# Skip Google Search Suggestions (these appear WITH the response)
suggestion_skip_patterns = [
'google search suggestions', 'display of search suggestions',
'learn more', 'google logo', 'safesearch=active',
'www.google.com/search', 'client=app-vertex-grounding',
'grounding with google search', 'documentation-path',
'typescript command line', 'typescript cli prompt',
'typescript icommand interface', 'apply_diff format'
]
if any(suggestion in line.lower() for suggestion in suggestion_skip_patterns):
print(f" 🚫 Skipping Google suggestion line: {line[:50]}...")
continue
# Skip user input indicators
if any(user_indicator in line.lower() for user_indicator in
['user:', 'you:', 'human:', 'prompt:']):
continue
# This might be an AI response
potential_responses.append(line)
# Return the last substantial response that's different from old_response
for response in reversed(potential_responses):
if response != old_response and len(response) > 30:
return response
return ""
def _is_google_suggestion_element(self, element) -> bool:
"""Check if an element is part of Google Search Suggestions"""
try:
# Check element classes and attributes
element_class = element.get_attribute('class') or ''
element_id = element.get_attribute('id') or ''
# Check if element or its parents have suggestion-related classes
suggestion_indicators = [
'search-entry-container',
'search-entry-container-title',
'search-entry-container-description',
'carousel',
'chip'
]
if any(indicator in element_class.lower() for indicator in suggestion_indicators):
return True
# Check parent elements
try:
parent = element.find_element(By.XPATH, '..')
parent_class = parent.get_attribute('class') or ''
if any(indicator in parent_class.lower() for indicator in suggestion_indicators):
return True
# Check grandparent
grandparent = parent.find_element(By.XPATH, '..')
grandparent_class = grandparent.get_attribute('class') or ''
if any(indicator in grandparent_class.lower() for indicator in suggestion_indicators):
return True
except:
pass
# Check element text content for suggestion patterns
element_text = element.text.lower()
suggestion_text_patterns = [
'google search suggestions',
'display of search suggestions',
'learn more',
'www.google.com/search',
'safesearch=active',
'client=app-vertex-grounding'
]
if any(pattern in element_text for pattern in suggestion_text_patterns):
return True
return False
except Exception as e:
# If we can't check, assume it's not a suggestion
return False
def _debug_page_structure(self):
"""Debug helper to understand page structure"""
try:
# Get page title
title = self.driver.title
print(f" 📄 Page title: {title}")
# Count elements that might contain responses
response_elements = self.driver.find_elements(By.CSS_SELECTOR, "div, p, pre, code")
print(f" 🔢 Found {len(response_elements)} potential content elements")
# Check for specific Google AI Studio elements
ai_elements = self.driver.find_elements(By.CSS_SELECTOR, "[data-testid*='response'], [data-testid*='message']")
if ai_elements: