Skip to content

Commit 125ee52

Browse files
committed
gtfs implementation (wip)
0 parents  commit 125ee52

35 files changed

+10745
-0
lines changed

go.mod

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module tidbyt.dev/gtfs
2+
3+
go 1.21.1
4+
5+
require (
6+
github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d
7+
github.com/mattn/go-sqlite3 v1.14.17
8+
github.com/pkg/errors v0.9.1
9+
github.com/spf13/cobra v1.7.0
10+
github.com/spkg/bom v1.0.0
11+
github.com/stretchr/testify v1.8.4
12+
)
13+
14+
require (
15+
github.com/davecgh/go-spew v1.1.1 // indirect
16+
github.com/inconshreveable/mousetrap v1.1.0 // indirect
17+
github.com/lib/pq v1.10.9 // indirect
18+
github.com/pmezard/go-difflib v1.0.0 // indirect
19+
github.com/spf13/pflag v1.0.5 // indirect
20+
gopkg.in/yaml.v3 v3.0.1 // indirect
21+
)

go.sum

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
2+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d h1:KbPOUXFUDJxwZ04vbmDOc3yuruGvVO+LOa7cVER3yWw=
6+
github.com/gocarina/gocsv v0.0.0-20230616125104-99d496ca653d/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
7+
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
8+
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
9+
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
10+
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
11+
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
12+
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
13+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
14+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
15+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
16+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
17+
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
18+
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
19+
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
20+
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
21+
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
22+
github.com/spkg/bom v1.0.0 h1:S939THe0ukL5WcTGiGqkgtaW5JW+O6ITaIlpJXTYY64=
23+
github.com/spkg/bom v1.0.0/go.mod h1:lAz2VbTuYNcvs7iaFF8WW0ufXrHShJ7ck1fYFFbVXJs=
24+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
25+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
26+
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
27+
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
28+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
29+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
30+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
31+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
32+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

manager.go

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
package gtfs
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"errors"
7+
"fmt"
8+
"io/ioutil"
9+
"net/http"
10+
"sort"
11+
"time"
12+
13+
"tidbyt.dev/gtfs/parse"
14+
"tidbyt.dev/gtfs/storage"
15+
)
16+
17+
const DefaultStaticRefreshInterval = 12 * time.Hour
18+
19+
var ErrNoActiveFeed = errors.New("no active feed found")
20+
21+
// Manager manages GTFS data.
22+
type Manager struct {
23+
RefreshInterval time.Duration
24+
storage storage.Storage
25+
}
26+
27+
func NewManager(storage storage.Storage) *Manager {
28+
return &Manager{
29+
storage: storage,
30+
RefreshInterval: DefaultStaticRefreshInterval,
31+
}
32+
}
33+
34+
// Loads GTFS data from a URL
35+
//
36+
// If a feed is available in storage, and active at the given time, it
37+
// is returned immediately. Otherwise, ErrNoActiveFeed is returned.
38+
//
39+
// If the URL is previously unseen, a marker is left in storage for a
40+
// later call to RefreshFeeds() to retrieve it.
41+
func (m *Manager) LoadStaticAsync(url string, when time.Time) (*Static, error) {
42+
43+
feeds, err := m.storage.ListFeeds(storage.ListFeedsFilter{URL: url})
44+
if err != nil {
45+
return nil, fmt.Errorf("listing feeds: %w", err)
46+
}
47+
48+
if len(feeds) == 0 {
49+
fmt.Println("No feeds found. Adding record to request it.")
50+
// No feeds found. Add record to request it.
51+
err = m.storage.WriteFeedMetadata(&storage.FeedMetadata{
52+
URL: url,
53+
})
54+
if err != nil {
55+
return nil, fmt.Errorf("writing feed metadata: %w", err)
56+
}
57+
return nil, ErrNoActiveFeed
58+
}
59+
60+
return m.loadMostRecentActive(feeds, when)
61+
}
62+
63+
// Loads GTFS data from a URL
64+
//
65+
// If the URL is previously unseen, it is retrieved immediately.
66+
//
67+
// If no feed is active at the given time, ErrNoActiveFeed is
68+
// returned.
69+
func (m *Manager) LoadStatic(url string, when time.Time) (*Static, error) {
70+
71+
// All feeds for URL, sorted by retrieval time
72+
feeds, err := m.storage.ListFeeds(storage.ListFeedsFilter{URL: url})
73+
if err != nil {
74+
return nil, fmt.Errorf("listing feeds: %w", err)
75+
}
76+
77+
sort.Slice(feeds, func(i, j int) bool {
78+
return feeds[i].RetrievedAt.Before(feeds[j].RetrievedAt)
79+
})
80+
81+
// If we don't have it, get it
82+
if len(feeds) == 0 {
83+
metadata, err := m.refreshStatic(url)
84+
if err != nil {
85+
return nil, fmt.Errorf("refreshing static: %w", err)
86+
}
87+
88+
err = m.storage.WriteFeedMetadata(metadata)
89+
if err != nil {
90+
return nil, fmt.Errorf("writing metadata: %w", err)
91+
}
92+
93+
feeds = []*storage.FeedMetadata{metadata}
94+
}
95+
96+
return m.loadMostRecentActive(feeds, when)
97+
}
98+
99+
// Refreshes any feeds that might need refreshing.
100+
func (m *Manager) Refresh(ctx context.Context) error {
101+
102+
// Get all feeds, group by URL
103+
feeds, err := m.storage.ListFeeds(storage.ListFeedsFilter{})
104+
if err != nil {
105+
return fmt.Errorf("listing feeds: %w", err)
106+
}
107+
feedsByURL := make(map[string][]*storage.FeedMetadata)
108+
for _, feed := range feeds {
109+
feedsByURL[feed.URL] = append(feedsByURL[feed.URL], feed)
110+
}
111+
112+
// Process each URL
113+
for url, feeds := range feedsByURL {
114+
err = m.refreshFeeds(url, feeds)
115+
if err != nil {
116+
return fmt.Errorf("refreshing %s: %w", url, err)
117+
}
118+
}
119+
120+
return nil
121+
}
122+
123+
func (m *Manager) refreshFeeds(url string, feeds []*storage.FeedMetadata) error {
124+
// If there's only one record, and it has no SHA256, it's an
125+
// async request for feed retrieval.
126+
if len(feeds) == 1 && feeds[0].SHA256 == "" {
127+
fmt.Printf("Refreshing async %s\n", url)
128+
metadata, err := m.refreshStatic(url)
129+
if err != nil {
130+
return fmt.Errorf("refreshing static at %s: %w", url, err)
131+
}
132+
133+
err = m.storage.WriteFeedMetadata(metadata)
134+
if err != nil {
135+
return fmt.Errorf("writing metadata: %w", err)
136+
}
137+
138+
// delete the async request
139+
err = m.storage.DeleteFeedMetadata(url, "")
140+
if err != nil {
141+
return fmt.Errorf("deleting metadata: %w", err)
142+
}
143+
144+
return nil
145+
}
146+
147+
fmt.Printf("Refreshing existing %s\n", url)
148+
149+
// If the most recently retrieved feed is outdated, it's
150+
// refresh time
151+
sort.Slice(feeds, func(i, j int) bool {
152+
return feeds[j].RetrievedAt.Before(feeds[i].RetrievedAt)
153+
})
154+
if !feeds[0].RetrievedAt.IsZero() && feeds[0].RetrievedAt.Add(m.RefreshInterval).Before(time.Now()) {
155+
metadata, err := m.refreshStatic(url)
156+
if err != nil {
157+
return fmt.Errorf("refreshing static at %s: %w", url, err)
158+
}
159+
160+
err = m.storage.WriteFeedMetadata(metadata)
161+
if err != nil {
162+
return fmt.Errorf("writing metadata: %w", err)
163+
}
164+
}
165+
166+
return nil
167+
}
168+
169+
// Refreshes a static feed from a URL. Returns the feed metadata if
170+
// successful. Note that the feed may already be in storage from a
171+
// previous refresh.
172+
func (m *Manager) refreshStatic(url string) (*storage.FeedMetadata, error) {
173+
174+
// TODO: add support for ETag?
175+
176+
// GET the feed
177+
client := http.Client{Timeout: 60 * time.Second}
178+
resp, err := client.Get(url)
179+
if err != nil {
180+
return nil, fmt.Errorf("downloading: %w", err)
181+
}
182+
defer resp.Body.Close()
183+
184+
// Compute SHA256 of body
185+
body, err := ioutil.ReadAll(resp.Body)
186+
if err != nil {
187+
return nil, fmt.Errorf("reading: %w", err)
188+
}
189+
hash := fmt.Sprintf("%x", sha256.Sum256(body))
190+
191+
// Check if this exact feed is already in storage
192+
feeds, err := m.storage.ListFeeds(storage.ListFeedsFilter{SHA256: hash})
193+
if err != nil {
194+
return nil, fmt.Errorf("listing feeds: %w", err)
195+
}
196+
if len(feeds) > 0 {
197+
for _, feed := range feeds {
198+
if feed.URL != url {
199+
// Found, but from a different
200+
// URL. Add a record for this URL so
201+
// future lookups can find it.
202+
feed.URL = url
203+
feed.UpdatedAt = time.Now()
204+
err = m.storage.WriteFeedMetadata(feed)
205+
if err != nil {
206+
return nil, fmt.Errorf("writing metadata: %w", err)
207+
}
208+
209+
return feed, nil
210+
}
211+
}
212+
213+
// Found, and from the same URL. Update timestamp
214+
// indicating last refresh, and return.
215+
feeds[0].UpdatedAt = time.Now()
216+
err = m.storage.WriteFeedMetadata(feeds[0])
217+
if err != nil {
218+
return nil, fmt.Errorf("writing metadata: %w", err)
219+
}
220+
221+
return feeds[0], nil
222+
}
223+
224+
// Feed is brand new to us. Parse and write to storage.
225+
writer, err := m.storage.GetWriter(hash)
226+
if err != nil {
227+
return nil, fmt.Errorf("getting writer: %w", err)
228+
}
229+
defer writer.Close()
230+
231+
metadata, err := parse.ParseStatic(writer, body)
232+
if err != nil {
233+
// Parse failure is special. If something fails to
234+
// parse now, there's no reason to retry
235+
// soon. Instead, we treat parse failure as if the
236+
// data simply hasn't been updated.
237+
feeds, listErr := m.storage.ListFeeds(storage.ListFeedsFilter{URL: url})
238+
if listErr != nil {
239+
return nil, fmt.Errorf("listing feeds: %w", listErr)
240+
}
241+
if len(feeds) > 0 {
242+
sort.Slice(feeds, func(i, j int) bool {
243+
return feeds[i].RetrievedAt.After(feeds[j].RetrievedAt)
244+
})
245+
feeds[0].UpdatedAt = time.Now()
246+
writeErr := m.storage.WriteFeedMetadata(feeds[0])
247+
if writeErr != nil {
248+
return nil, fmt.Errorf("writing metadata: %w", writeErr)
249+
}
250+
}
251+
252+
return nil, fmt.Errorf("parsing feed: %w", err)
253+
}
254+
255+
metadata.SHA256 = hash
256+
metadata.URL = url
257+
metadata.RetrievedAt = time.Now()
258+
metadata.UpdatedAt = metadata.RetrievedAt
259+
260+
return metadata, nil
261+
}
262+
263+
func feedActive(feed *storage.FeedMetadata, now time.Time) (bool, error) {
264+
feedTz, err := time.LoadLocation(feed.Timezone)
265+
if err != nil {
266+
return false, fmt.Errorf("loading timezone: %w", err)
267+
}
268+
269+
nowThere := now.In(feedTz)
270+
todayThere := time.Date(
271+
nowThere.Year(),
272+
nowThere.Month(),
273+
nowThere.Day(),
274+
0, 0, 0, 0,
275+
feedTz,
276+
).Format("20060102")
277+
278+
if feed.FeedStartDate != "" && feed.FeedStartDate > todayThere {
279+
return false, nil
280+
}
281+
if feed.FeedEndDate != "" && feed.FeedEndDate < todayThere {
282+
return false, nil
283+
}
284+
if feed.CalendarStartDate > todayThere {
285+
return false, nil
286+
}
287+
if feed.CalendarEndDate < todayThere {
288+
return false, nil
289+
}
290+
291+
return true, nil
292+
}
293+
294+
// Selects he most recently retrieved feed from feeds that is also
295+
// active at the given time.
296+
func (m *Manager) loadMostRecentActive(feeds []*storage.FeedMetadata, when time.Time) (*Static, error) {
297+
sort.Slice(feeds, func(i, j int) bool {
298+
return feeds[i].RetrievedAt.Before(feeds[j].RetrievedAt)
299+
})
300+
301+
for i := len(feeds) - 1; i >= 0; i-- {
302+
fmt.Printf("Considering feed %s %s\n", feeds[i].URL, feeds[i].SHA256)
303+
304+
ok, err := feedActive(feeds[i], when)
305+
if err != nil {
306+
return nil, fmt.Errorf("checking if feed is active: %w", err)
307+
}
308+
if !ok {
309+
fmt.Printf("Feed %s is not active\n", feeds[i].URL)
310+
continue
311+
}
312+
313+
// This is the one!
314+
reader, err := m.storage.GetReader(feeds[i].SHA256)
315+
if err != nil {
316+
return nil, fmt.Errorf("getting reader: %w", err)
317+
}
318+
static, err := NewStatic(reader, feeds[i])
319+
if err != nil {
320+
return nil, fmt.Errorf("creating static: %w", err)
321+
}
322+
return static, nil
323+
}
324+
325+
// No active feed found.
326+
return nil, ErrNoActiveFeed
327+
}
328+
329+
func (m *Manager) buildStatic(feed *storage.FeedMetadata) (*Static, error) {
330+
reader, err := m.storage.GetReader(feed.SHA256)
331+
if err != nil {
332+
return nil, fmt.Errorf("getting reader: %w", err)
333+
}
334+
static, err := NewStatic(reader, feed)
335+
if err != nil {
336+
return nil, fmt.Errorf("creating static: %w", err)
337+
}
338+
return static, nil
339+
}

0 commit comments

Comments
 (0)