Skip to content

Commit

Permalink
Use TCP for remote scraping
Browse files Browse the repository at this point in the history
  • Loading branch information
Wikidepia committed Jan 27, 2025
1 parent 077c05c commit 7b2af6f
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 51 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/klauspost/compress v1.17.11
github.com/tdewolff/parse/v2 v2.7.19
github.com/tidwall/gjson v1.18.0
github.com/xtaci/smux v1.5.33
go.etcd.io/bbolt v1.3.11
golang.org/x/image v0.22.0
golang.org/x/net v0.31.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/xtaci/smux v1.5.33 h1:xosoZt0AUZdIXEB6z09kt1bge+l1L8wzMtJdPB6GAPI=
github.com/xtaci/smux v1.5.33/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
Expand Down
51 changes: 9 additions & 42 deletions handlers/scraper/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/PuerkitoBio/goquery"
"github.com/kelindar/binary"
"github.com/klauspost/compress/gzhttp"
"github.com/klauspost/compress/zstd"
"github.com/tdewolff/parse/v2"
"github.com/tdewolff/parse/v2/js"
"github.com/tidwall/gjson"
Expand All @@ -26,13 +25,11 @@ import (
)

var (
RemoteScraperAddr string
ErrNotFound = errors.New("post not found")
timeout = 5 * time.Second
transport http.RoundTripper
transportNoProxy *http.Transport
sflightScraper singleflight.Group
remoteZSTDReader *zstd.Decoder
ErrNotFound = errors.New("post not found")
timeout = 5 * time.Second
transport http.RoundTripper
transportNoProxy *http.Transport
sflightScraper singleflight.Group
)

//go:embed dictionary.bin
Expand All @@ -51,15 +48,9 @@ type InstaData struct {
}

func init() {
var err error
transport = gzhttp.Transport(http.DefaultTransport, gzhttp.TransportAlwaysDecompress(true))
transportNoProxy = http.DefaultTransport.(*http.Transport).Clone()
transportNoProxy.Proxy = nil // Skip any proxy

remoteZSTDReader, err = zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderDicts(zstdDict))
if err != nil {
panic(err)
}
}

func GetData(postID string) (*InstaData, error) {
Expand Down Expand Up @@ -147,34 +138,10 @@ func GetData(postID string) (*InstaData, error) {

func (i *InstaData) ScrapeData() error {
// Scrape from remote scraper if available
if len(RemoteScraperAddr) > 0 {
remoteClient := http.Client{Transport: transportNoProxy, Timeout: timeout}
req, err := http.NewRequest("GET", RemoteScraperAddr+"/scrape/"+i.PostID, nil)
if err != nil {
return err
}
req.Header.Set("Accept-Encoding", "zstd.dict")
res, err := remoteClient.Do(req)
if err == nil && res != nil {
defer res.Body.Close()
remoteData, err := io.ReadAll(res.Body)
if err == nil && res.StatusCode == 200 {
remoteDecomp, err := remoteZSTDReader.DecodeAll(remoteData, nil)
if err != nil {
return err
}
if err := binary.Unmarshal(remoteDecomp, i); err == nil {
if len(i.Username) > 0 {
slog.Info("Data parsed from remote scraper", "postID", i.PostID)
return nil
}
}
}
slog.Error("Failed to scrape data from remote scraper", "postID", i.PostID, "status", res.StatusCode, "err", err)
}
if err != nil {
slog.Error("Failed when trying to scrape data from remote scraper", "postID", i.PostID, "err", err)
}
if err := ScrapeRemote(i); err == nil {
return nil
} else {
slog.Error("Failed to scrape data from remote scraper", "postID", i.PostID, "err", err)
}

client := http.Client{Transport: transport, Timeout: timeout}
Expand Down
127 changes: 127 additions & 0 deletions handlers/scraper/remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package handlers

import (
"errors"
"log/slog"
"net"
"sync"
"sync/atomic"
"time"

"github.com/kelindar/binary"
"github.com/xtaci/smux"
)

type remoteResult struct {
instaData *InstaData
outChan chan error
}

var sessCount atomic.Int32
var inChan chan remoteResult

func init() {
inChan = make(chan remoteResult)

ln, err := net.Listen("tcp", "0.0.0.0:4444")
if err != nil {
return
}
slog.Info("remote scraper is listening on", "address", ln.Addr())

go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}

go handleConnection(conn)
}
}()
}

func handleConnection(conn net.Conn) {
smuxConfig := smux.DefaultConfig()
smuxConfig.Version = 2

session, err := smux.Server(conn, smuxConfig)
if err != nil {
return
}
defer session.Close()
defer sessCount.Add(-1)

sessCount.Add(1)
var wg sync.WaitGroup
for {
stream, err := session.AcceptStream()
if err != nil {
break
}

wg.Add(1)
go func(stream *smux.Stream) {
defer func() {
stream.Close()
wg.Done()
}()

for rm := range inChan {
if err := stream.SetDeadline(time.Now().Add(3 * time.Second)); err != nil {
slog.Error("failed to set deadline", "err", err)
rm.outChan <- err
return
}

buf := []byte(rm.instaData.PostID)
if _, err = stream.Write(buf); err != nil {
slog.Error("failed to write to stream", "err", err)
rm.outChan <- err
return
}

outBuf := make([]byte, 1024*1024)
n, err := stream.Read(outBuf)
if err != nil {
slog.Error("failed to read from stream", "err", err)
rm.outChan <- err
return
}

if err = binary.Unmarshal(outBuf[:n], rm.instaData); err != nil {
slog.Error("failed to unmarshal data", "err", err)
rm.outChan <- err
return
}
rm.outChan <- nil
}
}(stream)
}

wg.Wait()
}

func ScrapeRemote(i *InstaData) error {
if sessCount.Load() == 0 {
return errors.New("remote scraper is not running")
}

remoteRes := remoteResult{
instaData: i,
outChan: make(chan error),
}

select {
case inChan <- remoteRes:
case <-time.After(time.Second):
return errors.New("no remote scraper is ready")
}

select {
case err := <-remoteRes.outChan:
return err
case <-time.After(5 * time.Second):
return errors.New("failed to get data from remote scraper")
}
}
9 changes: 0 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,9 @@ func init() {
func main() {
listenAddr := flag.String("listen", "0.0.0.0:3000", "Address to listen on")
gridCacheMaxFlag := flag.String("grid-cache-entries", "1024", "Maximum number of grid images to cache")
remoteScraperAddr := flag.String("remote-scraper", "", "Remote scraper address (https://github.com/Wikidepia/InstaFix-remote-scraper)")
videoProxyAddr := flag.String("video-proxy-addr", "", "Video proxy address (https://github.com/Wikidepia/InstaFix-proxy)")
flag.Parse()

// Initialize remote scraper
if *remoteScraperAddr != "" {
if !strings.HasPrefix(*remoteScraperAddr, "http") {
panic("Remote scraper address must start with http:// or https://")
}
scraper.RemoteScraperAddr = *remoteScraperAddr
}

// Initialize video proxy
if *videoProxyAddr != "" {
if !strings.HasPrefix(*videoProxyAddr, "http") {
Expand Down

0 comments on commit 7b2af6f

Please sign in to comment.