Skip to content

Commit

Permalink
Merge pull request #1 from edgulczynski/expose-publish-settings
Browse files Browse the repository at this point in the history
Expose publish settings
  • Loading branch information
edgulczynski authored Dec 12, 2019
2 parents aca1015 + 418505c commit 4997298
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/NYTimes/gizmo
module github.com/edgulczynski/gizmo

go 1.12

Expand Down
14 changes: 12 additions & 2 deletions pubsub/gcp/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gcp

import "github.com/kelseyhightower/envconfig"
import (
"github.com/kelseyhightower/envconfig"
gpubsub "cloud.google.com/go/pubsub"
)

// Config holds common credentials and config values for
// working with GCP PubSub.
Expand All @@ -9,8 +12,15 @@ type Config struct {

// For publishing
Topic string `envconfig:"GCP_PUBSUB_TOPIC"`

// Batch settings for GCP publisher
// See: https://godoc.org/cloud.google.com/go/pubsub#PublishSettings
// Note: this config will not allow you to go lower than the
// default PublishSettings values
PublishSettings gpubsub.PublishSettings

// For subscribing
Subscription string `envconfig:"GCP_PUBSUB_SUBSCRIPTION"`
Subscription string `envconfig:"GCP_PUBSUB_SUBSCRIPTION"`
}

// LoadConfigFromEnv will attempt to load a PubSub config
Expand Down
32 changes: 22 additions & 10 deletions pubsub/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ func (s *Subscriber) Stop() error {
return nil
}

// SetReceiveSettings sets the ReceivedSettings on the google pubsub Subscription.
// Should be called before Start().
func (s *Subscriber) SetReceiveSettings(settings gpubsub.ReceiveSettings) {
s.sub.(subscriptionImpl).Sub.ReceiveSettings = settings
}

// SubMessage pubsub implementation of pubsub.SubscriberMessage.
type SubMessage struct {
msg message
Expand Down Expand Up @@ -142,10 +136,25 @@ func NewPublisher(ctx context.Context, cfg Config, opts ...option.ClientOption)
if err != nil {
return nil, err
}

return &publisher{
topic: c.Topic(cfg.Topic),
}, nil
t := c.Topic(cfg.Topic)
// Update PublishSettings from cfg.PublishSettings
// Never set thresholds lower than the defaults
if cfg.PublishSettings.DelayThreshold > t.PublishSettings.DelayThreshold {
t.PublishSettings.DelayThreshold = cfg.PublishSettings.DelayThreshold
}
if cfg.PublishSettings.CountThreshold > t.PublishSettings.CountThreshold {
t.PublishSettings.CountThreshold = cfg.PublishSettings.CountThreshold
}
if cfg.PublishSettings.ByteThreshold > t.PublishSettings.ByteThreshold {
t.PublishSettings.ByteThreshold = cfg.PublishSettings.ByteThreshold
}
if cfg.PublishSettings.NumGoroutines > t.PublishSettings.NumGoroutines {
t.PublishSettings.NumGoroutines = cfg.PublishSettings.NumGoroutines
}
if cfg.PublishSettings.Timeout > t.PublishSettings.Timeout {
t.PublishSettings.Timeout = cfg.PublishSettings.Timeout
}
return &publisher{t}, nil
}

// Publish will marshal the proto message and publish it to GCP pubsub.
Expand Down Expand Up @@ -200,6 +209,7 @@ func (p *publisher) PublishMultiRaw(ctx context.Context, keys []string, messages
return nil
}


// interfaces and types to make this more testable
type (
subscription interface {
Expand All @@ -218,6 +228,8 @@ type (
subscriptionImpl struct {
Sub *gpubsub.Subscription
}


)

func (m messageImpl) ID() string {
Expand Down

0 comments on commit 4997298

Please sign in to comment.