Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions .trunk/trunk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# To learn more about the format of this file, see https://docs.trunk.io/reference/trunk-yaml
version: 0.1
cli:
version: 1.24.0
version: 1.25.0

# Trunk provides extensibility via plugins. (https://docs.trunk.io/plugins)
plugins:
sources:
- id: trunk
ref: v1.7.0
ref: v1.7.2
uri: https://github.com/trunk-io/plugins

# Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes)
Expand All @@ -26,23 +26,24 @@ lint:
- contrib/**
- protos/pb/pb.pb.go
enabled:
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected]
- [email protected].445
- [email protected].467
- [email protected]
- git-diff-check
- [email protected]
- [email protected]
- [email protected]
- osv-scanner@2.0.3
- osv-scanner@2.2.2
- [email protected]
- [email protected].0
- renovate@41.2.0
- shellcheck@0.10.0
- [email protected].2
- renovate@41.91.3
- shellcheck@0.11.0
- [email protected]
- tflint@0.58.0
- trufflehog@3.89.2
- tflint@0.59.1
- trufflehog@3.90.5
- [email protected]
actions:
enabled:
Expand Down
2 changes: 0 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v250/protos/api"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
_ "github.com/dgraph-io/gqlparser/v2/validator/rules" // make gql validator init() all rules
"github.com/dgraph-io/ristretto/v2/z"
"github.com/hypermodeinc/dgraph/v25/audit"
Expand Down Expand Up @@ -467,7 +466,6 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, closer *z.Closer) {

s := grpc.NewServer(opt...)
api.RegisterDgraphServer(s, &edgraph.Server{})
apiv2.RegisterDgraphServer(s, &edgraph.ServerV25{})
hapi.RegisterHealthServer(s, health.NewServer())
worker.RegisterZeroProxyServer(s)

Expand Down
30 changes: 15 additions & 15 deletions dgraph/cmd/dgraphimport/import_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v250"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/dgraph-io/dgo/v250/protos/api"
"github.com/dgraph-io/ristretto/v2/z"

"github.com/golang/glog"
"golang.org/x/sync/errgroup"
)

// newClient creates a new import client with the specified endpoint and gRPC options.
func newClient(connectionString string) (apiv2.DgraphClient, error) {
func newClient(connectionString string) (api.DgraphClient, error) {
dg, err := dgo.Open(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", connectionString, err)
}

glog.Infof("[import] Successfully connected to Dgraph endpoint: %s", connectionString)
return dg.GetAPIv2Client()[0], nil
return dg.GetAPIClients()[0], nil
}

func Import(ctx context.Context, connectionString string, bulkOutDir string) error {
Expand All @@ -48,9 +48,9 @@ func Import(ctx context.Context, connectionString string, bulkOutDir string) err
}

// initiateSnapshotStream initiates a snapshot stream session with the Dgraph server.
func initiateSnapshotStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.UpdateExtSnapshotStreamingStateResponse, error) {
func initiateSnapshotStream(ctx context.Context, dc api.DgraphClient) (*api.UpdateExtSnapshotStreamingStateResponse, error) {
glog.Info("[import] Initiating external snapshot stream")
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
req := &api.UpdateExtSnapshotStreamingStateRequest{
Start: true,
}
resp, err := dc.UpdateExtSnapshotStreamingState(ctx, req)
Expand All @@ -65,7 +65,7 @@ func initiateSnapshotStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.
// streamSnapshot takes a p directory and a set of group IDs and streams the data from the
// p directory to the corresponding group IDs. It first scans the provided directory for
// subdirectories named with numeric group IDs.
func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error {
func streamSnapshot(ctx context.Context, dc api.DgraphClient, baseDir string, groups []uint32) error {
glog.Infof("[import] Starting to stream snapshot from directory: %s", baseDir)

errG, errGrpCtx := errgroup.WithContext(ctx)
Expand All @@ -90,7 +90,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,
// If errors occurs during streaming of the external snapshot, we drop all the data and
// go back to ensure a clean slate and the cluster remains in working state.
glog.Info("[import] dropping all the data and going back to clean slate")
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
req := &api.UpdateExtSnapshotStreamingStateRequest{
Start: false,
Finish: true,
DropData: true,
Expand All @@ -104,7 +104,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,
}

glog.Info("[import] Completed streaming external snapshot")
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
req := &api.UpdateExtSnapshotStreamingStateRequest{
Start: false,
Finish: true,
DropData: false,
Expand All @@ -119,7 +119,7 @@ func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string,

// streamSnapshotForGroup handles the actual data streaming process for a single group.
// It opens the BadgerDB at the specified directory and streams all data to the server.
func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir string, groupId uint32) error {
func streamSnapshotForGroup(ctx context.Context, dc api.DgraphClient, pdir string, groupId uint32) error {
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)

// Initialize stream with the server
Expand Down Expand Up @@ -152,7 +152,7 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str

// Send group ID as the first message in the stream
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
groupReq := &api.StreamExtSnapshotRequest{GroupId: groupId}
if err := out.Send(groupReq); err != nil {
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
groupId, err)
Expand All @@ -171,13 +171,13 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
// streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
// It creates a new stream at the maximum sequence number and sends the data to the specified group.
// It also sends a final 'done' signal to mark completion.
func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExtSnapshotClient, groupId uint32) error {
func streamBadger(ctx context.Context, ps *badger.DB, out api.Dgraph_StreamExtSnapshotClient, groupId uint32) error {
stream := ps.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "[import] Sending external snapshot to group [" + fmt.Sprintf("%d", groupId) + "]"
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
p := &apiv2.StreamPacket{Data: buf.Bytes()}
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
p := &api.StreamPacket{Data: buf.Bytes()}
if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send data chunk: %w", err)
}
return nil
Expand All @@ -190,9 +190,9 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt

// Send the final 'done' signal to mark completion
glog.Infof("[import] Sending completion signal for group [%d]", groupId)
done := &apiv2.StreamPacket{Done: true}
done := &api.StreamPacket{Done: true}

if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: done}); err != nil && !errors.Is(err, io.EOF) {
if err := out.Send(&api.StreamExtSnapshotRequest{Pkt: done}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/mcp/mcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func NewMCPServer(connectionString string, readOnly bool) (*server.MCPServer, er
if err != nil {
return mcp.NewToolResultErrorFromErr("Error opening connection with Dgraph Alpha", err), nil
}
if err = conn.SetSchema(ctx, dgo.RootNamespace, schema); err != nil {
if err = conn.SetSchema(ctx, schema); err != nil {
return mcp.NewToolResultErrorFromErr("Schema alteration failed", err), nil
}

Expand Down
Loading