-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathosc.py
More file actions
123 lines (113 loc) · 5.42 KB
/
osc.py
File metadata and controls
123 lines (113 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import threading
import time
from helpers import *
from pythonosc import udp_client
import logging
import numpy as np
# ----------------------------
# Post-Process & OSC Sender Task
# ----------------------------
class OSCSenderTask(threading.Thread):
def __init__(self, result_queue, shared, lock):
super().__init__(daemon=True)
self.queue = result_queue
self.shared = shared
self.lock = lock
self.prev = {}
self.blink_ts = {"left":0, "right":0, "combined":0}
def run(self):
osc = None
while True:
data = self.queue.get()
with self.lock:
cfg = dict(self.shared)
# OSC client update
if osc is None or cfg["vrcOsc"] != f"{osc._address}:{osc._port}":
host, port = cfg["vrcOsc"].split(":")
osc = udp_client.SimpleUDPClient(host, int(port))
logging.info("OSC endpoint set to %s:%s", host, port)
# blink-release logic
now = time.time()
for eye in ["left","right"]:
o = data.get("o"+eye[0].upper())
if o == 0:
self.blink_ts[eye] = now
elif now <= self.blink_ts[eye] + cfg["blinkReleaseDelayMs"]/1000.0:
data["o"+eye[0].upper()] = 0
if "oL" in data and not cfg.get("independentOpenness",False):
cb = data["oL"]
if cb == 0:
self.blink_ts["combined"] = now
elif now <= self.blink_ts["combined"] + cfg["blinkReleaseDelayMs"]/1000.0:
data["oL"] = data["oR"] = 0
# determine mode and send
mode = ("none" if cfg["trackingForcedOffline"]
else "native" if cfg["vrcNative"]
else "v1" if cfg["vrcftV1"]
else "v2" if cfg["vrcftV2"]
else "none")
def send(key, *vals):
# Convert each value to a type python-osc understands
cleaned = []
for v in vals:
if isinstance(v, np.generic): # NumPy scalar → built-in float/int
v = float(v) if v.dtype.kind == "f" else int(v)
cleaned.append(v)
if self.prev.get(key) != tuple(cleaned):
self.prev[key] = tuple(cleaned)
osc.send_message(key, cleaned)
# NATIVE
if mode == "native":
if "t_comb" in data and not cfg.get("independentEyes",False):
send("/avatar/eye/native/combined", *data["t_comb"])
elif "tL" in data and cfg.get("independentEyes",False):
yL,xL = data["tL"]; yR,xR = data["tR"]
send("/avatar/eye/native/independent", yL, xL, yR, xR)
if "oL" in data:
send("/avatar/eye/native/openness", data["oL"])
# V1
elif mode == "v1":
if "t_comb" in data and not cfg.get("independentEyes",False):
y,x = data["t_comb"]
send("/avatar/parameters/LeftEyeX", x)
send("/avatar/parameters/RightEyeX", x)
send("/avatar/parameters/EyesY", -y)
elif "tL" in data and cfg.get("independentEyes",False):
yL,xL = data["tL"]; yR,xR = data["tR"]
send("/avatar/parameters/LeftEyeX", xL)
send("/avatar/parameters/RightEyeX", xR)
send("/avatar/parameters/EyesY", -yL)
if "oL" in data:
if cfg.get("independentOpenness",False):
send("/avatar/parameters/LeftEyeLid", data["oL"])
send("/avatar/parameters/RightEyeLid", data["oR"])
else:
send("/avatar/parameters/CombinedEyeLid", data["oL"])
# V2
elif mode == "v2":
pfx = cfg["oscPrefix"].strip("/")
base = f"/avatar/parameters/{pfx}/v2/" if pfx else "/avatar/parameters/v2/"
splitY = cfg.get("splitOutputY", False)
if "t_comb" in data and not cfg.get("independentEyes",False):
y,x = data["t_comb"]
if splitY:
send(base+"EyeLeftX", x); send(base+"EyeLeftY", -y)
send(base+"EyeRightX", x); send(base+"EyeRightY", -y)
else:
send(base+"EyeLeftX", x); send(base+"EyeRightX", x)
send(base+"EyeY", -y)
elif "tL" in data and cfg.get("independentEyes",False):
yL,xL = data["tL"]; yR,xR = data["tR"]
if splitY:
send(base+"EyeLeftX", xL); send(base+"EyeLeftY", -yL)
send(base+"EyeRightX", xR); send(base+"EyeRightY", -yR)
else:
send(base+"EyeLeftX", xL); send(base+"EyeRightX", xR)
send(base+"EyeY", -yL)
if "oL" in data:
if cfg.get("independentOpenness",False):
send(base+"EyeLidLeft", data["oL"])
send(base+"EyeLidRight", data["oR"])
else:
send(base+"EyeLidLeft", data["oL"])
send(base+"EyeLidRight", data["oL"])