Skip to content

Commit 005f904

Browse files
authored
Added initial support for P2P-based image distribution (#2303)
Signed-off-by: Yinuo Deng <[email protected]>
1 parent 2366836 commit 005f904

File tree

4,183 files changed

+750324
-32300
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

4,183 files changed

+750324
-32300
lines changed

cmd/dist-receiver/main.go

+316
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
// Copyright © 2023 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"archive/tar"
19+
"compress/gzip"
20+
"context"
21+
b64 "encoding/base64"
22+
"flag"
23+
"fmt"
24+
"io"
25+
"net/http"
26+
"os"
27+
"path/filepath"
28+
"strings"
29+
"sync"
30+
"time"
31+
32+
config "github.com/ipfs/go-ipfs-config"
33+
files "github.com/ipfs/go-ipfs-files"
34+
"github.com/ipfs/go-ipfs/core"
35+
"github.com/ipfs/go-ipfs/core/coreapi"
36+
"github.com/ipfs/go-ipfs/core/node/libp2p"
37+
"github.com/ipfs/go-ipfs/plugin/loader"
38+
"github.com/ipfs/go-ipfs/repo/fsrepo"
39+
icore "github.com/ipfs/interface-go-ipfs-core"
40+
icorepath "github.com/ipfs/interface-go-ipfs-core/path"
41+
"github.com/libp2p/go-libp2p-core/peer"
42+
ma "github.com/multiformats/go-multiaddr"
43+
)
44+
45+
var stageNow int
46+
var sig chan struct{}
47+
var targets []string
48+
49+
func main() {
50+
bootstrap := flag.String("bootstrap", "", "Specify the bootstrap node")
51+
cidArg := flag.String("cid", "", "Specify the CID")
52+
fileName := flag.String("filename", "", "Specify the name of the file to be distributed")
53+
targetDir := flag.String("target", "", "Specify the target directory")
54+
55+
flag.Parse()
56+
57+
stageNow = 0
58+
sig = make(chan struct{})
59+
60+
server := &http.Server{
61+
Addr: "0.0.0.0:4002",
62+
ReadTimeout: 5 * time.Second,
63+
WriteTimeout: 5 * time.Second,
64+
IdleTimeout: 600 * time.Second,
65+
}
66+
67+
http.HandleFunc("/stage", stage)
68+
http.HandleFunc("/next", next)
69+
http.HandleFunc("/connect", connect)
70+
go func() {
71+
if err := server.ListenAndServe(); err != nil {
72+
panic(fmt.Errorf("failed to spawn command receiver: %s", err))
73+
}
74+
}()
75+
76+
ctx, cancel := context.WithCancel(context.Background())
77+
defer cancel()
78+
79+
defer func() {
80+
if err := os.RemoveAll(*fileName); err != nil {
81+
fmt.Fprintf(os.Stderr, "Failed to clean up %s: %s\n", *fileName, err)
82+
}
83+
}()
84+
85+
node, _, err := spawnEphemeral(ctx, *bootstrap)
86+
if err != nil {
87+
panic(fmt.Errorf("failed to spawn ephemeral node: %s", err))
88+
}
89+
90+
if err := connectToPeers(ctx, node, []string{*bootstrap}); err != nil {
91+
fmt.Fprintf(os.Stderr, "Failed to connect to root peer: %s\n", err)
92+
}
93+
94+
stageNow = 1
95+
96+
<-sig
97+
98+
if err := connectToPeers(ctx, node, targets); err != nil {
99+
fmt.Fprintf(os.Stderr, "Failed to connect to peers: %s\n", err)
100+
}
101+
102+
cid := icorepath.New(*cidArg)
103+
104+
if err := node.Dht().Provide(ctx, cid); err != nil {
105+
fmt.Fprintf(os.Stderr, "Failed to seed the resource file: %s\n", err)
106+
}
107+
108+
if err := node.Pin().Add(ctx, cid); err != nil {
109+
fmt.Fprintf(os.Stderr, "Failed to pin the resource file: %s\n", err)
110+
}
111+
112+
rootNode, err := node.Unixfs().Get(ctx, cid)
113+
114+
if err != nil {
115+
panic(fmt.Errorf("could not get file with CID: %s", err))
116+
}
117+
118+
if err := files.WriteTo(rootNode, *fileName); err != nil {
119+
panic(fmt.Errorf("could not write out the fetched CID: %s", err))
120+
}
121+
122+
if err := os.MkdirAll(*targetDir, os.ModePerm); err != nil {
123+
panic(fmt.Errorf("failed to create target directory: %s", err))
124+
}
125+
126+
if err := extractTarGz(*fileName, *targetDir); err != nil {
127+
panic(fmt.Errorf("failed to uncompress resource file: %s", err))
128+
}
129+
130+
stageNow = 2
131+
132+
<-sig
133+
}
134+
135+
func createNode(ctx context.Context, repoPath string) (*core.IpfsNode, error) {
136+
// Open the repo
137+
repo, err := fsrepo.Open(repoPath)
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
// Construct the node
143+
nodeOptions := &core.BuildCfg{
144+
Online: true,
145+
Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records)
146+
// Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records)
147+
Repo: repo,
148+
}
149+
150+
return core.NewNode(ctx, nodeOptions)
151+
}
152+
153+
func spawnEphemeral(ctx context.Context, bootstrap string) (icore.CoreAPI, *core.IpfsNode, error) {
154+
if err := setupPlugins(""); err != nil {
155+
return nil, nil, err
156+
}
157+
158+
repoPath, err := createTempRepo(bootstrap)
159+
if err != nil {
160+
return nil, nil, fmt.Errorf("failed to create temp repo: %s", err)
161+
}
162+
163+
node, err := createNode(ctx, repoPath)
164+
if err != nil {
165+
return nil, nil, err
166+
}
167+
168+
api, err := coreapi.NewCoreAPI(node)
169+
170+
return api, node, err
171+
}
172+
173+
func setupPlugins(externalPluginsPath string) error {
174+
plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))
175+
if err != nil {
176+
return fmt.Errorf("error loading plugins: %s", err)
177+
}
178+
179+
if err := plugins.Initialize(); err != nil {
180+
return fmt.Errorf("error initializing plugins: %s", err)
181+
}
182+
183+
if err := plugins.Inject(); err != nil {
184+
return fmt.Errorf("error injecting plugins: %s", err)
185+
}
186+
187+
return nil
188+
}
189+
190+
func createTempRepo(bootstrap string) (string, error) {
191+
repoPath, err := os.MkdirTemp("", "ipfs-shell")
192+
if err != nil {
193+
return "", fmt.Errorf("failed to get temp dir: %s", err)
194+
}
195+
196+
cfg, err := config.Init(io.Discard, 2048)
197+
if err != nil {
198+
return "", err
199+
}
200+
201+
cfg.Bootstrap = append(cfg.Bootstrap, bootstrap)
202+
203+
if err := fsrepo.Init(repoPath, cfg); err != nil {
204+
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
205+
}
206+
207+
return repoPath, nil
208+
}
209+
210+
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
211+
var wg sync.WaitGroup
212+
peerInfos := make(map[peer.ID]*peer.AddrInfo, len(peers))
213+
for _, addrStr := range peers {
214+
addr, err := ma.NewMultiaddr(addrStr)
215+
if err != nil {
216+
return err
217+
}
218+
pii, err := peer.AddrInfoFromP2pAddr(addr)
219+
if err != nil {
220+
return err
221+
}
222+
pi, ok := peerInfos[pii.ID]
223+
if !ok {
224+
pi = &peer.AddrInfo{ID: pii.ID}
225+
peerInfos[pi.ID] = pi
226+
}
227+
pi.Addrs = append(pi.Addrs, pii.Addrs...)
228+
}
229+
230+
wg.Add(len(peerInfos))
231+
for _, peerInfo := range peerInfos {
232+
go func(peerInfo *peer.AddrInfo) {
233+
defer wg.Done()
234+
err := ipfs.Swarm().Connect(ctx, *peerInfo)
235+
if err != nil {
236+
panic(fmt.Errorf("failed to connect to %s: %s", peerInfo.ID, err))
237+
}
238+
}(peerInfo)
239+
}
240+
wg.Wait()
241+
return nil
242+
}
243+
244+
func stage(w http.ResponseWriter, req *http.Request) {
245+
fmt.Fprintf(w, "%d", stageNow)
246+
}
247+
248+
func next(w http.ResponseWriter, req *http.Request) {
249+
sig <- struct{}{}
250+
}
251+
252+
func connect(w http.ResponseWriter, req *http.Request) {
253+
target := req.URL.Query().Get("target")
254+
targetDecoded, err := b64.StdEncoding.DecodeString(target)
255+
if err != nil {
256+
return
257+
}
258+
259+
target = string(targetDecoded)
260+
261+
targets = strings.Split(target, ",")
262+
}
263+
264+
func extractTarGz(src, dest string) error {
265+
tarFile, err := os.Open(filepath.Clean(src))
266+
if err != nil {
267+
return err
268+
}
269+
defer tarFile.Close()
270+
271+
gzipReader, err := gzip.NewReader(tarFile)
272+
if err != nil {
273+
return err
274+
}
275+
defer gzipReader.Close()
276+
277+
tarReader := tar.NewReader(gzipReader)
278+
279+
for {
280+
header, err := tarReader.Next()
281+
282+
if err == io.EOF {
283+
break
284+
}
285+
if err != nil {
286+
return err
287+
}
288+
289+
// we created the tarball ourselves so it is safe
290+
// #nosec G305
291+
target := filepath.Join(dest, header.Name)
292+
293+
switch header.Typeflag {
294+
case tar.TypeDir:
295+
// it is expected to create directory with permission 755
296+
// #nosec G301
297+
if err := os.MkdirAll(target, 0755); err != nil {
298+
return err
299+
}
300+
case tar.TypeReg:
301+
outFile, err := os.Create(filepath.Clean(target))
302+
if err != nil {
303+
return err
304+
}
305+
defer outFile.Close()
306+
307+
// we created the tarball ourselves so it is safe
308+
// #nosec G110
309+
if _, err := io.Copy(outFile, tarReader); err != nil {
310+
return err
311+
}
312+
}
313+
}
314+
315+
return nil
316+
}

0 commit comments

Comments
 (0)