Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated test script for better handling of please enter based prompts… #22

Merged
merged 4 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
name: Quickstart Tests
on:
pull_request:
branches:
- main
pull_request: # Removed branches restriction to run on all PRs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove the comment, I guess you wanted me to read this comment? If that's the case then just comment it like this one in the PR. Because the comment in the code, doesn't make sense to a 3rd person.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

workflow_dispatch: # Add manual trigger option

jobs:
Expand All @@ -17,10 +15,10 @@ jobs:
DOCKER_LOGS_DIR: ${GITHUB_WORKSPACE}/logs

steps:
- uses: actions/checkout@v4 # Updated from v3
- uses: actions/checkout@v4

- name: Set up Python 3.10
uses: actions/setup-python@v5 # Updated from v4
uses: actions/setup-python@v5
with:
python-version: '3.10'

Expand Down Expand Up @@ -120,11 +118,11 @@ jobs:

- name: Upload test logs
if: always()
uses: actions/upload-artifact@v4 # Updated from v3
uses: actions/upload-artifact@v4
with:
name: test-logs
path: |
logs/
/logs/
.env
retention-days: 30 # Added retention period
retention-days: 30
145 changes: 101 additions & 44 deletions tests/test_run_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ def check_docker_status(logger: logging.Logger, config_path: str) -> bool:
def check_service_health(logger: logging.Logger, config_path: str) -> tuple[bool, dict]:
"""Enhanced service health check with metrics."""
service_config = get_service_config(config_path)

if "mech" in config_path.lower():
logger.info("Bypassing health check for mech service")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info("Bypassing health check for mech service")
logger.info("Bypassing health check for mech service because it's not supported")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return True, {
'response_time': 0,
'status_code': 200,
'error': None,
'successful_checks': 1,
'total_checks': 1
}

health_check_url = service_config["health_check_url"]

metrics = {
Expand Down Expand Up @@ -258,7 +269,6 @@ def get_token_config():
def handle_erc20_funding(output: str, logger: logging.Logger, rpc_url: str) -> str:
"""Handle funding requirement using Tenderly API for ERC20 tokens."""
pattern = r"\[(optimistic|base|mode)\].*Please make sure Master (?:EOA|Safe) (0x[a-fA-F0-9]{40}) has at least ([0-9.]+) ([A-Z]+)"
logger.info(f"Funding with RPC : {rpc_url}")
match = re.search(pattern, output)
if match:
chain = match.group(1)
Expand Down Expand Up @@ -341,7 +351,6 @@ def handle_native_funding(output: str, logger: logging.Logger, rpc_url: str, con
r"Please make sure Master Safe (0x[a-fA-F0-9]{40}) has at least (\d+\.\d+) (?:ETH|xDAI)"
]

logger.info(f"Funding with RPC : {rpc_url}")

for pattern in patterns:
match = re.search(pattern, output)
Expand Down Expand Up @@ -561,7 +570,7 @@ def get_config_files():
logger.info(f"Found config files: {[f.name for f in config_files]}")

# TODO: Support the test for memeooorr and mech
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this comment also

return [str(f) for f in config_files if "mech" not in f.name]
return [str(f) for f in config_files]


def validate_backup_owner(backup_owner: str) -> str:
Expand All @@ -572,21 +581,44 @@ def validate_backup_owner(backup_owner: str) -> str:
raise ValueError(f"Invalid backup owner address: {backup_owner}")
return Web3.to_checksum_address(backup_owner)

def handle_env_var_prompt(output: str, logger: logging.Logger, config_path: str) -> str:
"""
Simple handler for environment variable prompts, returns value from config.
"""
try:
with open(config_path, 'r') as f:
config = json.load(f)

# Get prompt text after "Please enter"
prompt_match = re.search(r"Please enter (.*?)(?:\s*\[hidden input\])?[:.]?$", output.strip())
if not prompt_match:
logger.info("No prompt match found")
return '\n'

prompt_text = prompt_match.group(1).strip()
logger.info(f"Looking for config value for: {prompt_text}")

env_vars = config.get('env_variables', {})
# Look for the variable by its name field
for key, var_config in env_vars.items():
if prompt_text == var_config['name']:
logger.info(f"Found matching variable: {key}")
return var_config['value']

logger.warning(f"No value found in config for: {prompt_text}")
return '\n'

except Exception as e:
logger.error(f"Error handling env var prompt: {str(e)}")
return '\n'

def get_base_config() -> dict:
"""Get base configuration common to all services."""
base_config = {
"TEST_PASSWORD": os.getenv('TEST_PASSWORD', 'test_secret'),
"BACKUP_WALLET": validate_backup_owner("0x802D8097eC1D49808F3c2c866020442891adde57"),
"STAKING_CHOICE": '1'
}

# def handle_please_enter(output, logger, config):
# pass
# parse config
# for env_var in config.env_vars:
# if f"Please enter {env_var.name}" in output:
# return env_var.value
# return "\n"

# Common prompts used across all services
base_prompts = {
Expand All @@ -595,7 +627,6 @@ def get_base_config() -> dict:
r"Enter your choice": base_config["STAKING_CHOICE"],
r"Please input your backup owner \(leave empty to skip\)\:": base_config["BACKUP_WALLET"], # Escape parentheses
r"Press enter to continue": "\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
r"Press enter to continue": "\n",
r"Press enter to continue": "\n",
r"Please enter .*": lambda output, logger: handle_env_var_prompt(output, logger, config_path)

why not keep this line here and then remove it from all the agent-specific parts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt do it because as its a get_base_config handler, didnt wanted to pass config paths into it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it too

r"Please enter ": "\n",
}

return {"config": base_config, "prompts": base_prompts}
Expand All @@ -620,7 +651,8 @@ def get_config_specific_settings(config_path: str) -> dict:
r"Please make sure Master (EOA|Safe) .*has at least.*(?:ETH|xDAI)":
lambda output, logger: create_funding_handler(test_config["RPC_URL"], "modius")(output, logger),
r"Please make sure Master (?:EOA|Safe) .*has at least.*(?:USDC|OLAS)":
lambda output, logger: create_token_funding_handler(test_config["RPC_URL"])(output, logger)
lambda output, logger: create_token_funding_handler(test_config["RPC_URL"])(output, logger),
r"Please enter .*": lambda output, logger: handle_env_var_prompt(output, logger, config_path)
})

elif "optimus" in config_path.lower():
Expand Down Expand Up @@ -654,33 +686,22 @@ def get_chain_rpc(output: str, logger: logging.Logger) -> str:
r"\[(?:optimistic|base|mode)\].*Please make sure Master (EOA|Safe) .*has at least.*(?:ETH|xDAI)":
lambda output, logger: create_funding_handler(get_chain_rpc(output, logger), "optimus")(output, logger),
r"\[(?:optimistic|base|mode)\].*Please make sure Master (?:EOA|Safe) .*has at least.*(?:USDC|OLAS)":
lambda output, logger: create_token_funding_handler(get_chain_rpc(output, logger))(output, logger)
lambda output, logger: create_token_funding_handler(get_chain_rpc(output, logger))(output, logger),
r"Please enter .*": lambda output, logger: handle_env_var_prompt(output, logger, config_path)
})

elif "mech" in config_path.lower():
# Define API keys dict once
api_keys = {
"openai": ["dummy_api_key"],
"google_api_key": ["dummy_api_key"]
}
prompts = {k: v for k, v in base["prompts"].items() if k != r"Please enter"}

test_config = {
**base_config,
"RPC_URL": os.getenv('GNOSIS_RPC_URL', ''),
"RPC_URL": os.getenv('GNOSIS_RPC_URL', '')
}

def handle_api_keys(output: str, logger: logging.Logger) -> str:
"""Handler for API keys"""
logger.info("Sending API keys")
return json.dumps(api_keys) # Return the dictionary directly, not as a JSON string

# Add Mech-specific prompts
prompts.update({
r"eth_newFilter \[hidden input\]": test_config["RPC_URL"],
r"Please make sure Master (EOA|Safe) .*has at least.*(?:ETH|xDAI)":
lambda output, logger: create_funding_handler(test_config["RPC_URL"], "mech")(output, logger),
r"Please enter API keys": handle_api_keys # Use handler instead of direct dict
r"Please enter .*": lambda output, logger: handle_env_var_prompt(output, logger, config_path)
})

elif "memeooorr" in config_path.lower():
Expand All @@ -689,11 +710,10 @@ def handle_api_keys(output: str, logger: logging.Logger) -> str:
**base_config, # Include base config
"BASE_RPC_URL": os.getenv('BASE_RPC_URL'),
}

# Add Memeooorr-specific prompts
prompts.update({
r"Enter a Base RPC that supports eth_newFilter \[hidden input\]": test_config["BASE_RPC_URL"],
r"Please enter.*": "", # Handles all "Please enter" prompts with empty string
r"Please enter .*": lambda output, logger: handle_env_var_prompt(output, logger, config_path),
r"Please make sure Master (EOA|Safe) .*has at least.*(?:ETH|xDAI)":
lambda output, logger: create_funding_handler(test_config["BASE_RPC_URL"], "memeooorr")(output, logger),
})
Expand All @@ -704,12 +724,12 @@ def handle_api_keys(output: str, logger: logging.Logger) -> str:
**base_config, # Include base config
"RPC_URL": os.getenv('GNOSIS_RPC_URL', '')
}

# Add PredictTrader-specific prompts
prompts.update({
r"eth_newFilter \[hidden input\]": test_config["RPC_URL"],
r"Please make sure Master (EOA|Safe) .*has at least.*(?:ETH|xDAI)":
lambda output, logger: create_funding_handler(test_config["RPC_URL"], "predict_trader")(output, logger)
lambda output, logger: create_funding_handler(test_config["RPC_URL"], "predict_trader")(output, logger),
r"Please enter .*": lambda output, logger: handle_env_var_prompt(output, logger, config_path)
})

return {"prompts": prompts, "test_config": test_config}
Expand All @@ -720,23 +740,60 @@ def log_expect_match(child, pattern, match_index, logger):

def validate_input_match(pattern, response, prompt_text, logger):
"""Validate that the input matches the expected prompt."""
# Extract the actual prompt text if available
prompt_lower = prompt_text.lower() if prompt_text else ""

# Map of expected prompt types and their validation rules
prompt_validators = {
"password": lambda p, r: "password" in p.lower() and len(r.strip()) > 0,
"backup_owner": lambda p, r: "backup owner" in p.lower() and Web3.is_address(r.strip()),
"api_keys": lambda p, r: "api keys" in p.lower() and isinstance(json.loads(r.strip()), dict),
"rpc": lambda p, r: "eth_newfilter" in p.lower() and r.strip().startswith(("http://", "https://")),
"choice": lambda p, r: "choice" in p.lower() and r.strip().isdigit(),
"enter": lambda p, r: any(x in p.lower() for x in ["press enter", "continue"]) and r.strip() == ""
# Security/Authentication inputs
"password": lambda p: "password" in p,
"backup_owner": lambda p: "backup owner" in p,

# API Keys and Keys
"api_key": lambda p: any(x in p for x in [
"api key",
"api_key",
"apikey",
"tenderly api",
"coingecko api",
"gemini api"
]),

# Tenderly specific inputs
"tenderly_info": lambda p: any(x in p for x in [
"tenderly account",
"tenderly project",
"account slug",
"project slug"
]),

# Twitter related inputs
"twitter_creds": lambda p: any(x in p for x in [
"twitter username",
"twitter email",
"twitter password"
]),

# Technical configurations
"rpc": lambda p: any(x in p for x in ["rpc", "eth_newfilter"]),
"address": lambda p: any(x in p for x in ["address", "contract", "safe"]),

# Content/Description inputs
"persona": lambda p: "persona" in p,

# Navigation inputs
"choice": lambda p: "choice" in p,
"enter": lambda p: any(x in p for x in ["press enter", "continue"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it appears to me that these agent specific checks are bloating the whole thing too much. If I'm not wrong then they were added to resolve that ENS issue right? If yes, then they are not needed anymore. We can clean up a lot here, wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I shall remove this, once PR passes the test

}

# Try to determine prompt type
prompt_type = next((k for k, v in prompt_validators.items()
if v(pattern, response)), "unknown")

logger.debug(f"Processing input of type: {prompt_type}")

return prompt_type
for ptype, validator in prompt_validators.items():
if validator(prompt_lower):
logger.debug(f"Matched input type: {ptype} for prompt: {prompt_text}")
return ptype

logger.debug(f"No specific type match for prompt: {prompt_text}")
return "unknown"


def send_input_safely(child, response, prompt_type, logger):
Expand Down
Loading