Skip to content

Commit

Permalink
Edgedb support (#34)
Browse files Browse the repository at this point in the history
* created driver for edgedb

* first implementation of Store interface for edgedb

* fixed missing module reference in edgeql statement

* temporarily changing module path in go.mod

* readded schema file for events object in edgedb

* fixed broken query

* fixed issue where edgedb is expecting a datetime and we're giving it nostr.Timestamp

* edgedb expects int64 and got int. Fixed that

* edgedb expects int64 and got int. Fixed that

* tags now stored as bytes

* slice of byte slices

* trying something

* simplified some code

* fixed kinds filter issue with type mismatch

* added array_unpack to certain queries

* changed event content type to optional string

* casting limit as int64 and setting default limit valuesg

* fixed broken query for tags

* forgot to cast filter defined limit as int64

* added a migration call for easier initialization of edgedb

* added a note to explain the particular error handling when performing migration

* trying to fix issue where empty tags are not being accepted by edgedb

* using the OptionalTags type

* using the OptionalTags type

* trying to fix issue

* trying something

* fixed broken edgeql query
  • Loading branch information
TheRebelOfBabylon authored Dec 5, 2024
1 parent 2a5a77b commit c246cfd
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 99 deletions.
16 changes: 16 additions & 0 deletions edgedb/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edgedb

import (
"context"

"github.com/nbd-wtf/go-nostr"
)

// DeleteEvent implements the method of the eventstore.Store interface
func (b *EdgeDBBackend) DeleteEvent(ctx context.Context, event *nostr.Event) error {
query := "DELETE events::Event FILTER .eventId = <str>$eventId"
args := map[string]interface{}{
"eventId": event.ID,
}
return b.Client.QuerySingle(ctx, query, &Event{}, args)
}
19 changes: 19 additions & 0 deletions edgedb/edgedb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package edgedb

import "github.com/edgedb/edgedb-go"

type EdgeDBBackend struct {
*edgedb.Client
DatabaseURI string
TLSSkipVerify bool
QueryIDsLimit int
QueryAuthorsLimit int
QueryKindsLimit int
QueryTagsLimit int
QueryLimit int
}

// Close implements the Close method of the eventstore.Store interface
func (b *EdgeDBBackend) Close() {
b.Client.Close()
}
15 changes: 15 additions & 0 deletions edgedb/events.esdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module events {
type Event {
required eventId: str {
constraint exclusive;
};
required pubkey: str;
required createdAt: datetime;
required kind: int64;
tags: array<json>;
content: str;
required sig: str {
constraint exclusive;
};
}
}
73 changes: 73 additions & 0 deletions edgedb/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package edgedb

import (
"context"
"errors"

"github.com/edgedb/edgedb-go"
"github.com/fiatjaf/eventstore"
)

var _ eventstore.Store = (*EdgeDBBackend)(nil)

const (
queryLimit = 100
queryIDsLimit = 500
queryAuthorsLimit = 500
queryKindsLimit = 10
queryTagsLimit = 10
)

var (
initialMigration = `CREATE MIGRATION {
CREATE MODULE events IF NOT EXISTS;
CREATE TYPE events::Event {
CREATE PROPERTY content: std::str;
CREATE REQUIRED PROPERTY createdAt: std::datetime;
CREATE REQUIRED PROPERTY eventId: std::str {
CREATE CONSTRAINT std::exclusive;
};
CREATE REQUIRED PROPERTY kind: std::int64;
CREATE REQUIRED PROPERTY pubkey: std::str;
CREATE REQUIRED PROPERTY sig: std::str {
CREATE CONSTRAINT std::exclusive;
};
CREATE PROPERTY tags: array<std::json>;
};
};`
)

// Init implements the Init method of the eventstore.Store inteface.
// It establishes the connection with Edgedb
func (b *EdgeDBBackend) Init() error {
opts := edgedb.Options{}
if b.TLSSkipVerify {
opts.TLSOptions = edgedb.TLSOptions{SecurityMode: edgedb.TLSModeInsecure}
}
dbConn, err := edgedb.CreateClientDSN(context.Background(), b.DatabaseURI, opts)
if err != nil {
return err
}
// perform initial migration. NOTE: we check for SchemaError since that is the type of error returned when there's a duplicate schema. Kind of dumb
var dbErr edgedb.Error
if err := dbConn.Execute(context.Background(), initialMigration); err != nil && errors.As(err, &dbErr) && !dbErr.Category(edgedb.SchemaError) {
return err
}
b.Client = dbConn
if b.QueryAuthorsLimit == 0 {
b.QueryAuthorsLimit = queryAuthorsLimit
}
if b.QueryLimit == 0 {
b.QueryLimit = queryLimit
}
if b.QueryIDsLimit == 0 {
b.QueryIDsLimit = queryIDsLimit
}
if b.QueryKindsLimit == 0 {
b.QueryKindsLimit = queryKindsLimit
}
if b.QueryTagsLimit == 0 {
b.QueryTagsLimit = queryTagsLimit
}
return nil
}
145 changes: 145 additions & 0 deletions edgedb/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package edgedb

import (
"context"
"errors"
"fmt"
"strings"

"github.com/nbd-wtf/go-nostr"
)

var (
ErrTooManyIDs = errors.New("too many ids")
ErrTooManyAuthors = errors.New("too many authors")
ErrTooManyKinds = errors.New("too many kinds")
ErrEmptyTagSet = errors.New("empty tag set")
ErrTooManyTagValues = errors.New("too many tag values")
)

// QueryEvents is an implementation of the QueryEvents method of the eventstore.Store interfac for edgedb
func (b *EdgeDBBackend) QueryEvents(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) {
query, args, err := b.queryEventsEdgeql(filter, false)
if err != nil {
return nil, err
}
var events []Event
if err := b.Query(ctx, query, &events, args); err != nil {
return nil, fmt.Errorf("failed to fetch events using query %s: %w", query, err)
}
ch := make(chan *nostr.Event)
go func() {
defer close(ch)
for _, event := range events {
e, err := EdgeDBEventToNostrEvent(event)
if err != nil {
panic(fmt.Errorf("failed to fetch events using query %s: %w", query, err))
}
select {
case ch <- e:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}

// queryEventsEdgeql builds the edgeql query based on the applied filters
func (b *EdgeDBBackend) queryEventsEdgeql(filter nostr.Filter, doCount bool) (string, map[string]interface{}, error) {
var (
conditions []string
query string
)
args := map[string]interface{}{}
if len(filter.IDs) > 0 {
if len(filter.IDs) > b.QueryIDsLimit {
return query, args, ErrTooManyIDs
}
conditions = append(conditions, `events::Event.eventId IN array_unpack(<array<str>>$ids)`)
args["ids"] = filter.IDs
}

if len(filter.Authors) > 0 {
if len(filter.Authors) > b.QueryAuthorsLimit {
return query, args, ErrTooManyAuthors
}
conditions = append(conditions, `events::Event.pubkey IN array_unpack(<array<str>>$authors)`)
args["authors"] = filter.Authors
}

if len(filter.Kinds) > 0 {
if len(filter.Kinds) > b.QueryKindsLimit {
return query, args, ErrTooManyKinds
}
conditions = append(conditions, `events::Event.kind IN array_unpack(<array<int64>>$kinds)`)
int64Kinds := []int64{}
for _, k := range filter.Kinds {
int64Kinds = append(int64Kinds, int64(k))
}
args["kinds"] = int64Kinds
}
/*
SELECT events::Event {*} FILTER (
with ts := (
for tag in array_unpack(.tags) UNION (
SELECT tag[1] if count(json_array_unpack(tag)) > 1 else <json>'' FILTER tag[0] = <json>'x'
)
)
SELECT EXISTS (SELECT ts INTERSECT {<json>'y', <json>'z'})
);
*/
for letter, values := range filter.Tags {
if len(values) == 0 {
return query, args, ErrEmptyTagSet
}
if len(values) > b.QueryTagsLimit {
return query, args, ErrTooManyTagValues
}
jsonSet := func(vals []string) string {
var set []string
for _, val := range vals {
set = append(set, "<json>'"+val+"'")
}
return strings.Join(set, ", ")
}
conditions = append(conditions, fmt.Sprintf(`(
with ts := (
for tag in array_unpack(events::Event.tags) UNION (
SELECT tag[1] if count(json_array_unpack(tag)) > 1 else <json>'' FILTER (tag[0] if count(json_array_unpack(tag)) > 0 else <json>'') = <json>'%s'
)
)
SELECT EXISTS (SELECT ts INTERSECT {%s})
)`, letter, jsonSet(values)))

}

if filter.Since != nil {
conditions = append(conditions, `events::Event.createdAt >= <datetime>$since`)
args["since"] = filter.Since.Time()
}
if filter.Until != nil {
conditions = append(conditions, `events::Event.createdAt <= <datetime>$until`)
args["until"] = filter.Until.Time()
}
if filter.Search != "" {
conditions = append(conditions, `events::Event.content LIKE <str>$search`)
args["search"] = "%" + strings.ReplaceAll(filter.Search, "%", `\%`) + "%"
}
query = "SELECT events::Event {*}"
if doCount {
query = "SELECT count(events::Event)"
}
if len(conditions) > 0 {
query += " FILTER " + strings.Join(conditions, " AND ")
}
if !doCount {
query += " ORDER BY events::Event.createdAt DESC"
}
query += " LIMIT <int64>$limit"
args["limit"] = int64(filter.Limit)
if filter.Limit < 1 || filter.Limit > b.QueryLimit {
args["limit"] = int64(b.QueryLimit)
}
return query, args, nil
}
31 changes: 31 additions & 0 deletions edgedb/save.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package edgedb

import (
"context"
"encoding/json"

"github.com/edgedb/edgedb-go"
"github.com/nbd-wtf/go-nostr"
)

func (b *EdgeDBBackend) SaveEvent(ctx context.Context, event *nostr.Event) error {
tagsBytes := [][]byte{}
for _, t := range event.Tags {
tagBytes, err := json.Marshal(t)
if err != nil {
return err
}
tagsBytes = append(tagsBytes, tagBytes)
}
query := "INSERT events::Event { eventId := <str>$eventId, pubkey := <str>$pubkey, createdAt := <datetime>$createdAt, kind := <int64>$kind, tags := <array<json>>$tags, content := <str>$content, sig := <str>$sig }"
args := map[string]interface{}{
"eventId": event.ID,
"pubkey": event.PubKey,
"createdAt": edgedb.NewOptionalDateTime(event.CreatedAt.Time()),
"kind": int64(event.Kind),
"tags": tagsBytes,
"content": event.Content,
"sig": event.Sig,
}
return b.Client.QuerySingle(ctx, query, &Event{}, args)
}
63 changes: 63 additions & 0 deletions edgedb/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package edgedb

import (
"encoding/json"

"github.com/edgedb/edgedb-go"
"github.com/nbd-wtf/go-nostr"
)

type Event struct {
ID edgedb.UUID `edgedb:"id"`
EventID string `edgedb:"eventId"`
Pubkey string `edgedb:"pubkey"`
CreatedAt edgedb.OptionalDateTime `edgedb:"createdAt"`
Kind int64 `edgedb:"kind"`
Tags [][]byte `edgedb:"tags"`
Content edgedb.OptionalStr `edgedb:"content"`
Sig string `edgedb:"sig"`
}

// NostrEventToEdgeDBEvent converts the event from the nostr.Event datatype to edgedb.Event
func NostrEventToEdgeDBEvent(event *nostr.Event) (Event, error) {
tagsBytes := [][]byte{}
for _, t := range event.Tags {
tagBytes, err := json.Marshal(t)
if err != nil {
return Event{}, err
}
tagsBytes = append(tagsBytes, tagBytes)
}
return Event{
EventID: event.ID,
Pubkey: event.PubKey,
CreatedAt: edgedb.NewOptionalDateTime(event.CreatedAt.Time()),
Kind: int64(event.Kind),
Tags: tagsBytes, // NewOptionalTags(tagsBytes),
Content: edgedb.NewOptionalStr(event.Content),
Sig: event.Sig,
}, nil
}

// EdgeDBEventToNostrEvent converts the event from the edgedb.Event datatype to nostr.Event
func EdgeDBEventToNostrEvent(event Event) (*nostr.Event, error) {
tags := nostr.Tags{}
for _, tagBytes := range event.Tags {
var tag nostr.Tag
if err := json.Unmarshal(tagBytes, &tag); err != nil {
return nil, err
}
tags = append(tags, tag)
}
createdAt, _ := event.CreatedAt.Get()
content, _ := event.Content.Get()
return &nostr.Event{
ID: event.EventID,
PubKey: event.Pubkey,
CreatedAt: nostr.Timestamp(createdAt.Unix()),
Kind: int(event.Kind),
Tags: tags,
Content: content,
Sig: event.Sig,
}, nil
}
Loading

0 comments on commit c246cfd

Please sign in to comment.