-
Notifications
You must be signed in to change notification settings - Fork 107
/
Copy pathregionurlprovider.go
130 lines (104 loc) · 3.47 KB
/
regionurlprovider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package lksdk
import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"
"google.golang.org/protobuf/encoding/protojson"
"github.com/livekit/protocol/livekit"
)
const (
regionHostnameProviderSettingsCacheTime = 3 * time.Second
)
type regionURLProvider struct {
hostnameSettingsCache map[string]*hostnameSettingsCacheItem // hostname -> regionSettings
mutex sync.RWMutex
httpClient *http.Client
}
type hostnameSettingsCacheItem struct {
regionSettings *livekit.RegionSettings
updatedAt time.Time
regionURLAttempts map[string]int
}
func newRegionURLProvider() *regionURLProvider {
return ®ionURLProvider{
hostnameSettingsCache: make(map[string]*hostnameSettingsCacheItem),
httpClient: &http.Client{
Timeout: 5 * time.Second,
},
}
}
func (r *regionURLProvider) RefreshRegionSettings(cloudHostname, token string) error {
r.mutex.RLock()
hostnameSettings := r.hostnameSettingsCache[cloudHostname]
r.mutex.RUnlock()
if hostnameSettings != nil && time.Since(hostnameSettings.updatedAt) < regionHostnameProviderSettingsCacheTime {
return nil
}
settingsURL := "https://" + cloudHostname + "/settings/regions"
req, err := http.NewRequest("GET", settingsURL, nil)
if err != nil {
return errors.New("refreshRegionSettings failed to create request: " + err.Error())
}
req.Header = http.Header{
"Authorization": []string{"Bearer " + token},
}
resp, err := r.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New("refreshRegionSettings failed to fetch region settings. http status: " + resp.Status)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return errors.New("refreshRegionSettings failed to read response body: " + err.Error())
}
regions := &livekit.RegionSettings{}
if err := protojson.Unmarshal(respBody, regions); err != nil {
return errors.New("refreshRegionSettings failed to decode region settings: " + err.Error())
}
item := &hostnameSettingsCacheItem{
regionSettings: regions,
updatedAt: time.Now(),
regionURLAttempts: map[string]int{},
}
r.mutex.Lock()
r.hostnameSettingsCache[cloudHostname] = item
r.mutex.Unlock()
if len(item.regionSettings.Regions) == 0 {
logger.Warnw("no regions returned", nil, "cloudHostname", cloudHostname)
}
return nil
}
// PopBestURL removes and returns the best region URL. Once all URLs are exhausted, it will return an error.
// RefreshRegionSettings must be called to repopulate the list of regions.
func (r *regionURLProvider) PopBestURL(cloudHostname, token string) (string, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
hostnameSettings := r.hostnameSettingsCache[cloudHostname]
if hostnameSettings == nil || hostnameSettings.regionSettings == nil || len(hostnameSettings.regionSettings.Regions) == 0 {
return "", errors.New("no regions available")
}
bestRegionURL := hostnameSettings.regionSettings.Regions[0].Url
hostnameSettings.regionSettings.Regions = hostnameSettings.regionSettings.Regions[1:]
return bestRegionURL, nil
}
func parseCloudURL(serverURL string) (string, error) {
parsedURL, err := url.Parse(serverURL)
if err != nil {
return "", fmt.Errorf("invalid server url (%s): %v", serverURL, err)
}
if !isCloud(parsedURL.Hostname()) {
return "", errors.New("not a cloud url")
}
return parsedURL.Hostname(), nil
}
func isCloud(hostname string) bool {
return strings.HasSuffix(hostname, "livekit.cloud") || strings.HasSuffix(hostname, "livekit.io")
}