Skip to content

Commit

Permalink
Fix streaming client for streaming_sink resource (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
emerkle826 authored Jan 24, 2025
1 parent c3bfecf commit f528dc0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
12 changes: 6 additions & 6 deletions internal/provider/resource_streaming_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strings"

Expand Down Expand Up @@ -129,7 +130,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
}

astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
streamingClientv3 := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)

tenantName := resourceData.Get("tenant_name").(string)
sinkName := resourceData.Get("sink_name").(string)
Expand Down Expand Up @@ -202,7 +203,7 @@ type SinkResponse struct {

func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
streamingClientv3 := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)

tenantName := resourceData.Get("tenant_name").(string)
sinkName := resourceData.Get("sink_name").(string)
Expand Down Expand Up @@ -249,7 +250,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc

func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
streamingClientv3 := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)

rawRegion := resourceData.Get("region").(string)
region := strings.ReplaceAll(rawRegion, "-", "")
Expand Down Expand Up @@ -285,9 +286,8 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
return diag.FromErr(fmt.Errorf("failed to request pulsar clusters: %w", err))
}

var streamingClusters StreamingClusters
if err = json.Unmarshal(streamingClustersResponse.Body, &streamingClusters); err != nil {
return diag.FromErr(fmt.Errorf("failed to read pulsar clusters: %w", err))
if streamingClustersResponse.StatusCode() != http.StatusOK {
return diag.FromErr(fmt.Errorf("failed to read pulsar clusters. Status code: %s, msg:\n%s", streamingClustersResponse.Status(), string(streamingClustersResponse.Body)))
}

pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
Expand Down
5 changes: 5 additions & 0 deletions internal/provider/resource_streaming_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ type StreamingClusters []struct {
JvmVersion string `json:"jvmVersion"`
PulsarVersion string `json:"pulsarVersion"`
Email string `json:"Email"`
UserMetricsUrl string `json:"userMetricsUrl"`
PulsarInstance string `json:"pulsarInstance"`
PulsarClusterDNS string `json:"pulsarClusterDNS"`
ClusterType string `json:"clusterType"`
AzType string `json:"azType"`
}

func resourceStreamingTenantUpdate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
Expand Down

0 comments on commit f528dc0

Please sign in to comment.