Skip to content

Commit 9bca2d8

Browse files
committed
remove lip2p key, use transactions
1 parent 85445a5 commit 9bca2d8

File tree

1 file changed

+109
-118
lines changed

1 file changed

+109
-118
lines changed

cmd/migrate-curio/migrate.go

+109-118
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"crypto/rand"
54
"database/sql"
65
"encoding/json"
76
"errors"
@@ -34,7 +33,6 @@ import (
3433
"github.com/ipfs/go-datastore"
3534
"github.com/ipfs/go-datastore/namespace"
3635
cbor "github.com/ipfs/go-ipld-cbor"
37-
"github.com/libp2p/go-libp2p/core/crypto"
3836
"github.com/mitchellh/go-homedir"
3937
"github.com/urfave/cli/v2"
4038
"golang.org/x/net/context"
@@ -163,11 +161,6 @@ func migrate(cctx *cli.Context, repoDir string) error {
163161
return xerrors.Errorf("failed to migrate DDO deals: %w", err)
164162
}
165163

166-
// Migrate libp2p key
167-
if err := generateNewKeys(ctx, maddr, hdb); err != nil {
168-
return xerrors.Errorf("failed to migrate libp2p key: %w", err)
169-
}
170-
171164
return nil
172165
}
173166

@@ -191,7 +184,11 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
191184

192185
deals := append(aDeals, cDeals...)
193186

194-
for _, deal := range deals {
187+
for i, deal := range deals {
188+
if i > 0 && i%100 == 0 {
189+
fmt.Printf("Migrating Boost Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100)
190+
}
191+
195192
llog := log.With("Boost Deal", deal.DealUuid.String())
196193
// Skip deals which are before add piece
197194
if deal.Checkpoint < dealcheckpoints.AddedPiece {
@@ -269,62 +266,69 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
269266
return fmt.Errorf("deal: %s: failed to marshal headers: %s", deal.DealUuid.String(), err)
270267
}
271268

272-
// Add deal to HarmonyDB
273-
if !a {
274-
_, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid,
269+
_, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
270+
// Add deal to HarmonyDB
271+
if !a {
272+
_, err = tx.Exec(`INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid,
275273
proposal_signature, proposal, piece_cid,
276274
piece_size, offline, verified, start_epoch, end_epoch,
277275
client_peer_id, fast_retrieval, announce_to_ipni, url, url_headers, chain_deal_id, publish_cid, created_at)
278276
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
279277
ON CONFLICT (uuid) DO NOTHING`,
280-
deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(),
281-
prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(),
282-
deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt)
283-
284-
if err != nil {
285-
return fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err)
286-
}
287-
288-
// Mark deal added to harmonyDB
289-
_, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String())
290-
if err != nil {
291-
return fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err)
292-
}
293-
}
294-
295-
if !b {
296-
// Add LID details to pieceDeal in HarmonyDB
297-
_, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
298-
deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false)
299-
if err != nil {
300-
return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err)
278+
deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(),
279+
prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(),
280+
deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt)
281+
282+
if err != nil {
283+
return false, fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err)
284+
}
285+
286+
// Mark deal added to harmonyDB
287+
_, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String())
288+
if err != nil {
289+
return false, fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err)
290+
}
301291
}
302292

303-
// Mark deal added to pieceDeal in HarmonyDB
304-
_, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String())
305-
if err != nil {
306-
return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err)
293+
if !b {
294+
// Add LID details to pieceDeal in HarmonyDB
295+
_, err = tx.Exec(`SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
296+
deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false)
297+
if err != nil {
298+
return false, fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err)
299+
}
300+
301+
// Mark deal added to pieceDeal in HarmonyDB
302+
_, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String())
303+
if err != nil {
304+
return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err)
305+
}
307306
}
308-
}
309307

310-
if !c {
311-
var proof abi.RegisteredSealProof
312-
err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
313-
if err != nil {
314-
return fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err)
315-
}
308+
if !c {
309+
var proof abi.RegisteredSealProof
310+
err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
311+
if err != nil {
312+
return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err)
313+
}
316314

317-
// Add deal to mk12 pipeline in Curio for indexing and announcement
318-
_, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
315+
// Add deal to mk12 pipeline in Curio for indexing and announcement
316+
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
319317
after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset,
320318
sealed, should_index, indexing_created_at, announce)
321319
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`,
322-
deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline,
323-
true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true)
324-
if err != nil {
325-
return fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err)
320+
deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline,
321+
true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true)
322+
if err != nil {
323+
return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err)
324+
}
326325
}
326+
return true, nil
327+
}, harmonydb.OptionRetry())
328+
if err != nil {
329+
return err
327330
}
331+
328332
}
329333

330334
return nil
@@ -366,7 +370,10 @@ func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors
366370
return err
367371
}
368372

369-
for _, deal := range legacyDeals {
373+
for i, deal := range legacyDeals {
374+
if i > 0 && i%100 == 0 {
375+
fmt.Printf("Migrating Legacy Deals: %d / %d (%0.2f%%)\n", i, len(legacyDeals), float64(i)/float64(len(legacyDeals))*100)
376+
}
370377
llog := log.With("Boost Deal", deal.ProposalCid.String())
371378
// Skip deals which do not have chain deal ID
372379
if deal.DealID == 0 {
@@ -422,7 +429,7 @@ func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors
422429

423430
prop := deal.ClientDealProposal.Proposal
424431

425-
_, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (signed_proposal_cid, sp_id, client_peer_id,
432+
_, err = hdb.Exec(ctx, `INSERT INTO signed_proposal_cid (signed_proposal_cid, sp_id, client_peer_id,
426433
proposal_signature, proposal, piece_cid,
427434
piece_size, verified, start_epoch, end_epoch,
428435
publish_cid, chain_deal_id, fast_retrieval, created_at, sector_num)
@@ -459,7 +466,10 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
459466
return fmt.Errorf("failed to get all DDO deals: %w", err)
460467
}
461468

462-
for _, deal := range deals {
469+
for i, deal := range deals {
470+
if i > 0 && i%100 == 0 {
471+
fmt.Printf("Migrating DDO Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100)
472+
}
463473
llog := log.With("Boost Deal", deal.ID.String())
464474
if deal.Err != "" && deal.Retry == types.DealRetryFatal {
465475
llog.Infow("Skipping as deal retry is fatal")
@@ -505,86 +515,67 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
505515
continue
506516
}
507517

508-
if !a {
509-
// Add DDO deal to harmonyDB
510-
_, err = hdb.Exec(ctx, `INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified,
518+
_, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
519+
if !a {
520+
// Add DDO deal to harmonyDB
521+
_, err = tx.Exec(`INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified,
511522
start_epoch, end_epoch, allocation_id, piece_cid, piece_size, fast_retrieval, announce_to_ipni)
512523
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
513524
ON CONFLICT (uuid) DO NOTHING`,
514-
deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID,
515-
deal.PieceCID.String(), deal.PieceSize, true, true)
516-
517-
if err != nil {
518-
return fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err)
525+
deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID,
526+
deal.PieceCID.String(), deal.PieceSize, true, true)
527+
528+
if err != nil {
529+
return false, fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err)
530+
}
531+
532+
// Mark deal added to harmonyDB
533+
_, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String())
534+
if err != nil {
535+
return false, fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err)
536+
}
519537
}
520538

521-
// Mark deal added to harmonyDB
522-
_, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String())
523-
if err != nil {
524-
return fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err)
525-
}
526-
}
527-
528-
if !b {
529-
// Add LID details to pieceDeal in HarmonyDB
530-
_, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
531-
deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false)
532-
if err != nil {
533-
return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err)
539+
if !b {
540+
// Add LID details to pieceDeal in HarmonyDB
541+
_, err = tx.Exec(`SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
542+
deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false)
543+
if err != nil {
544+
return false, fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err)
545+
}
546+
547+
// Mark deal added to pieceDeal in HarmonyDB
548+
_, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String())
549+
if err != nil {
550+
return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err)
551+
}
534552
}
535553

536-
// Mark deal added to pieceDeal in HarmonyDB
537-
_, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String())
538-
if err != nil {
539-
return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err)
540-
}
541-
}
554+
// TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals
555+
if !c {
556+
var proof abi.RegisteredSealProof
557+
err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
558+
if err != nil {
559+
return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err)
560+
}
542561

543-
// TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals
544-
if !c {
545-
var proof abi.RegisteredSealProof
546-
err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof)
547-
if err != nil {
548-
return fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err)
549-
}
550-
551-
// Add deal to mk12 pipeline in Curio for indexing and announcement
552-
_, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
562+
// Add deal to mk12 pipeline in Curio for indexing and announcement
563+
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline,
553564
after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset,
554565
sealed, should_index, indexing_created_at, announce)
555566
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`,
556-
deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true,
557-
true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true)
558-
if err != nil {
559-
return fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err)
567+
deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true,
568+
true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true)
569+
if err != nil {
570+
return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err)
571+
}
560572
}
573+
return true, nil
574+
}, harmonydb.OptionRetry())
575+
if err != nil {
576+
return err
561577
}
562578
}
563579

564580
return nil
565581
}
566-
567-
func generateNewKeys(ctx context.Context, maddr address.Address, hdb *harmonydb.DB) error {
568-
569-
mid, err := address.IDFromAddress(maddr)
570-
if err != nil {
571-
return err
572-
}
573-
574-
pk, _, err := crypto.GenerateEd25519Key(rand.Reader)
575-
if err != nil {
576-
return fmt.Errorf("generating private key: %w", err)
577-
}
578-
579-
kbytes, err := crypto.MarshalPrivateKey(pk)
580-
if err != nil {
581-
return fmt.Errorf("marshaling private key: %w", err)
582-
}
583-
584-
_, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes)
585-
if err != nil {
586-
return fmt.Errorf("inserting private key into libp2p table: %w", err)
587-
}
588-
589-
return nil
590-
}

0 commit comments

Comments
 (0)