-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathline_selector.py
135 lines (112 loc) · 5.51 KB
/
line_selector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import re
from aiohttp import web
from server import PromptServer
class LineSelector:
def __init__(self):
self._counter = -1
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"text": ("STRING", {"multiline": True}), # Input for multiple lines
"line_number": ("INT", {"default": 0, "min": 0, "max": 99999}), # 0 for random, >0 for specific line
"RANDOM": ("BOOLEAN", {"default": False}), # Force random selection
"LOOP": ("BOOLEAN", {"default": False}), # Return all lines as list
"LOOP_SEQUENTIAL": ("BOOLEAN", {"default": False}), # Sequential looping
"jump": ("INT", {"default": 1, "min": 1, "max": 100, "step": 1}), # Jump size for sequential loop
"pick_random_variable": ("BOOLEAN", {"default": False}), # Enable random choice functionality
},
"optional": {
"variables": ("STRING", {"multiline": True, "forceInput": True}),
"seed": ("INT", {
"default": -1,
"min": -1,
"max": 0x7FFFFFFFFFFFFFFF
}),
},
}
RETURN_TYPES = ("STRING", "INT", "INT") # String output, remaining cycles, current line number
RETURN_NAMES = ("text", "remaining_cycles", "current_line")
OUTPUT_IS_LIST = (True, False, False) # Only text output can be a list
FUNCTION = "select_line"
CATEGORY = "Bjornulf"
def select_line(self, text, line_number, RANDOM, LOOP, LOOP_SEQUENTIAL, jump, pick_random_variable, variables="", seed=-1):
# Parse variables
var_dict = {}
for line in variables.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
var_dict[key.strip()] = value.strip()
# Replace variables in the text
for key, value in var_dict.items():
text = text.replace(f"<{key}>", value)
# Split the input text into lines, remove empty lines and lines starting with #
lines = [line.strip() for line in text.split('\n')
if line.strip() and not line.strip().startswith('#')]
if not lines:
return (["No valid lines found."], 0, 0)
import random
import os
# Set seed if provided
if seed >= 0:
random.seed(seed)
# Process random choice functionality if enabled
if pick_random_variable:
pattern = r'\{([^}]+)\}'
def replace_random(match):
return random.choice(match.group(1).split('|'))
lines = [re.sub(pattern, replace_random, line) for line in lines]
# Handle sequential looping
if LOOP_SEQUENTIAL:
counter_file = os.path.join("Bjornulf", "line_selector_counter.txt")
os.makedirs(os.path.dirname(counter_file), exist_ok=True)
try:
with open(counter_file, 'r') as f:
current_index = int(f.read().strip())
except (FileNotFoundError, ValueError):
current_index = -jump
next_index = current_index + jump
if next_index >= len(lines):
with open(counter_file, 'w') as f:
f.write(str(-jump))
raise ValueError(f"Counter has reached the last line (total lines: {len(lines)}). Counter has be reset.")
with open(counter_file, 'w') as f:
f.write(str(next_index))
remaining_cycles = max(0, (len(lines) - next_index - 1) // jump + 1)
return ([lines[next_index]], remaining_cycles, next_index + 1)
# Handle normal LOOP mode
if LOOP:
return (lines, len(lines), 0)
# Handle RANDOM or line_number selection
if RANDOM or line_number == 0:
selected = random.choice(lines)
else:
index = min(line_number - 1, len(lines) - 1)
index = max(0, index)
selected = lines[index]
return ([selected], 0, line_number if line_number > 0 else 0)
@classmethod
def IS_CHANGED(s, text, line_number, RANDOM, LOOP, LOOP_SEQUENTIAL, jump, pick_random_variable, variables="", seed=-1):
return float("NaN") if LOOP_SEQUENTIAL else (text, line_number, RANDOM, LOOP, LOOP_SEQUENTIAL, jump, pick_random_variable, variables, seed)
@PromptServer.instance.routes.post("/reset_line_selector_counter")
async def reset_line_selector_counter(request):
counter_file = os.path.join("Bjornulf", "line_selector_counter.txt")
try:
os.remove(counter_file)
return web.json_response({"success": True}, status=200)
except FileNotFoundError:
return web.json_response({"success": True}, status=200)
except Exception as e:
return web.json_response({"success": False, "error": str(e)}, status=500)
@PromptServer.instance.routes.post("/get_line_selector_counter")
async def get_line_selector_counter(request):
counter_file = os.path.join("Bjornulf", "line_selector_counter.txt")
try:
with open(counter_file, 'r') as f:
current_index = int(f.read().strip())
return web.json_response({"success": True, "value": current_index + 1}, status=200)
except (FileNotFoundError, ValueError):
return web.json_response({"success": True, "value": 0}, status=200)
except Exception as e:
return web.json_response({"success": False, "error": str(e)}, status=500)