Skip to content

Commit

Permalink
nb
Browse files Browse the repository at this point in the history
  • Loading branch information
Wikidepia committed Jan 25, 2025
1 parent 4a67803 commit 7c2355b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
2 changes: 2 additions & 0 deletions handlers/scraper/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (i *InstaData) ScrapeData() error {
// Scrape from remote scraper if available
if err := ScrapeRemote(i); err == nil {
return nil
} else {
slog.Error("Failed to scrape data from remote scraper", "postID", i.PostID, "err", err)
}

client := http.Client{Transport: transport, Timeout: timeout}
Expand Down
31 changes: 7 additions & 24 deletions handlers/scraper/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"net"
"sync/atomic"
"syscall"
"time"

"github.com/kelindar/binary"
Expand All @@ -17,11 +15,9 @@ type remoteResult struct {
outChan chan error
}

var connStream atomic.Int32
var inChan chan remoteResult

func init() {
connStream.Store(0)
inChan = make(chan remoteResult)

ln, err := net.Listen("tcp", "localhost:4444")
Expand Down Expand Up @@ -54,34 +50,25 @@ func handleConnection(conn net.Conn) {
break
}

buf := make([]byte, 64*1024)
go func() {
connStream.Add(1)
defer connStream.Add(-1)
for {
select {
case rm := <-inChan:
buf = []byte(rm.instaData.PostID)
buf := []byte(rm.instaData.PostID)
if _, err = stream.Write(buf); err != nil {
if errors.Is(err, syscall.EPIPE) {
stream.Close()
}
rm.outChan <- err
return
}

var n int
outBuf := make([]byte, 128*1024)
if n, err = stream.Read(outBuf); err != nil {
if errors.Is(err, syscall.EPIPE) {
stream.Close()
}
outBuf := make([]byte, 1024*1024)
if n, err := stream.Read(outBuf); err == nil {
err = binary.Unmarshal(outBuf[:n], rm.instaData)
rm.outChan <- err
} else {
rm.outChan <- err
return
}

err = binary.Unmarshal(outBuf[:n], rm.instaData)
rm.outChan <- err
case <-stream.GetDieCh():
return
}
Expand All @@ -91,10 +78,6 @@ func handleConnection(conn net.Conn) {
}

func ScrapeRemote(i *InstaData) error {
if connStream.Load() == 0 {
return errors.New("Remote scraper is not running")
}

remoteRes := remoteResult{
instaData: i,
outChan: make(chan error),
Expand All @@ -103,7 +86,7 @@ func ScrapeRemote(i *InstaData) error {
select {
case inChan <- remoteRes:
default:
return nil
return errors.New("remote scraper is not running")
}

select {
Expand Down

0 comments on commit 7c2355b

Please sign in to comment.