diff --git a/pkg/sinker/examples/log/Makefile b/pkg/sinker/examples/log/Makefile index 0850b8dc..e94cbff6 100644 --- a/pkg/sinker/examples/log/Makefile +++ b/pkg/sinker/examples/log/Makefile @@ -10,7 +10,7 @@ image-push: build docker buildx build -t "quay.io/numaio/numaflow-go/sink-log:${TAG}" --platform linux/amd64,linux/arm64 --target log . --push .PHONY: image -imageimage: build +image: build docker build -t "quay.io/numaio/numaflow-go/sink-log:${TAG}" --target log . @if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/sink-log:${TAG}"; fi diff --git a/pkg/sinker/examples/log/main.go b/pkg/sinker/examples/log/main.go index 89f18a76..1f7c1156 100644 --- a/pkg/sinker/examples/log/main.go +++ b/pkg/sinker/examples/log/main.go @@ -20,6 +20,8 @@ func (l *logSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) fmt.Println("User Defined Sink:", string(d.Value())) id := d.ID() result = result.Append(sinksdk.ResponseOK(id)) + // if we are not able to write to sink and if we have a fallback sink configured + // we can use sinksdk.ResponseFallback(id)) to write the message to fallback sink } return result } diff --git a/pkg/sinker/options.go b/pkg/sinker/options.go index 837ee830..34dbcd81 100644 --- a/pkg/sinker/options.go +++ b/pkg/sinker/options.go @@ -15,7 +15,7 @@ func defaultOptions() *options { defaultPath := serverInfoFilePath defaultAddress := address - if os.Getenv(EnvUDContainerType) == UDContainerFallbackSink { + if os.Getenv(EnvUDContaclinerType) == UDContainerFallbackSink { defaultPath = fbServerInfoFilePath defaultAddress = fbAddress } diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index c23d73ee..d6dc09b2 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -18,8 +18,8 @@ const ( fbAddress = "/var/run/numaflow/fb-sink.sock" serverInfoFilePath = "/var/run/numaflow/sinker-server-info" fbServerInfoFilePath = "/var/run/numaflow/fb-sinker-server-info" - EnvUDContainerType = "EnvUDContainerType" - UDContainerFallbackSink = "fb-sink" + EnvUDContaclinerType = "NUMAFLOW_UD_CONTAINER_TYPE" + UDContainerFallbackSink = "fb-udsink" ) // handlerDatum implements the Datum interface and is used in the sink functions.