@@ -57,9 +57,6 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
57
57
return nil , err
58
58
}
59
59
60
- timeField , toOk := jsonData ["timeField" ].(string )
61
- timeOutputFormat , tofOk := jsonData ["timeOutputFormat" ].(string )
62
-
63
60
logLevelField , ok := jsonData ["logLevelField" ].(string )
64
61
if ! ok {
65
62
logLevelField = ""
@@ -74,6 +71,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
74
71
if ! ok {
75
72
index = ""
76
73
}
74
+ // XXX : Legacy check, should not happen ?
77
75
if index == "" {
78
76
index = settings .Database
79
77
}
@@ -92,18 +90,11 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
92
90
maxConcurrentShardRequests = 256
93
91
}
94
92
95
- if ! toOk || ! tofOk {
96
- timeField , timeOutputFormat , err = GetTimestampFieldInfos (index , settings .URL , httpCli )
97
- if nil != err {
98
- return nil , err
99
- }
100
- }
101
-
102
93
configuredFields := es.ConfiguredFields {
103
- TimeField : timeField ,
104
- TimeOutputFormat : timeOutputFormat ,
105
94
LogLevelField : logLevelField ,
106
95
LogMessageField : logMessageField ,
96
+ TimeField : "" ,
97
+ TimeOutputFormat : "" ,
107
98
}
108
99
109
100
model := es.DatasourceInfo {
@@ -113,16 +104,62 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
113
104
Database : index ,
114
105
MaxConcurrentShardRequests : int64 (maxConcurrentShardRequests ),
115
106
ConfiguredFields : configuredFields ,
107
+ ReadyStatus : make (chan es.ReadyStatus , 1 ),
108
+ ShouldInit : true ,
116
109
}
117
- return & QuickwitDatasource {dsInfo : model }, nil
110
+
111
+ ds := & QuickwitDatasource {dsInfo : model }
112
+
113
+ // Create an initialization goroutine
114
+ go func (ds * QuickwitDatasource , readyStatus chan <- es.ReadyStatus ) {
115
+ var status es.ReadyStatus = es.ReadyStatus {
116
+ IsReady : false ,
117
+ Err : nil ,
118
+ }
119
+ for {
120
+ // Will retry init everytime the channel is consumed until ready
121
+ if ! status .IsReady || ds .dsInfo .ShouldInit {
122
+ qwlog .Debug ("Initializing Datasource" )
123
+ status .IsReady = true
124
+ status .Err = nil
125
+
126
+ indexMetadataList , err := GetIndexesMetadata (ds .dsInfo .Database , ds .dsInfo .URL , ds .dsInfo .HTTPClient )
127
+ if err != nil {
128
+ status .IsReady = false
129
+ status .Err = fmt .Errorf ("failed to get index metadata : %w" , err )
130
+ } else if len (indexMetadataList ) == 0 {
131
+ status .IsReady = false
132
+ status .Err = fmt .Errorf ("no index found for %s" , ds .dsInfo .Database )
133
+ } else {
134
+ timeField , timeOutputFormat , err := GetTimestampFieldInfos (indexMetadataList )
135
+ if nil != err {
136
+ status .IsReady = false
137
+ status .Err = err
138
+ } else if "" == timeField {
139
+ status .IsReady = false
140
+ status .Err = fmt .Errorf ("timefield is empty for %s" , ds .dsInfo .Database )
141
+ } else if "" == timeOutputFormat {
142
+ status .Err = fmt .Errorf ("timefield's output_format is empty, logs timestamps will not be parsed correctly for %s" , ds .dsInfo .Database )
143
+ }
144
+
145
+ ds .dsInfo .ConfiguredFields .TimeField = timeField
146
+ ds .dsInfo .ConfiguredFields .TimeOutputFormat = timeOutputFormat
147
+ ds .dsInfo .ShouldInit = false
148
+ }
149
+ }
150
+ readyStatus <- status
151
+ }
152
+ }(ds , model .ReadyStatus )
153
+ return ds , nil
118
154
}
119
155
120
156
// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
121
157
// created. As soon as datasource settings change detected by SDK old datasource instance will
122
158
// be disposed and a new one will be created using NewSampleDatasource factory function.
123
159
func (ds * QuickwitDatasource ) Dispose () {
124
- // Clean up datasource instance resources.
125
- // TODO
160
+ // FIXME: The ReadyStatus channel should probably be closed here, but doing it
161
+ // causes odd calls to healthcheck to fail. Needs investigation
162
+ // close(ds.dsInfo.ReadyStatus)
126
163
}
127
164
128
165
// CheckHealth handles health checks sent from Grafana to the plugin.
@@ -131,13 +168,39 @@ func (ds *QuickwitDatasource) Dispose() {
131
168
// a datasource is working as expected.
132
169
func (ds * QuickwitDatasource ) CheckHealth (ctx context.Context , req * backend.CheckHealthRequest ) (* backend.CheckHealthResult , error ) {
133
170
res := & backend.CheckHealthResult {}
134
-
135
171
res .Status = backend .HealthStatusOk
136
172
res .Message = "plugin is running"
173
+
174
+ ds .dsInfo .ShouldInit = true
175
+ status := <- ds .dsInfo .ReadyStatus
176
+
177
+ if nil != status .Err {
178
+ res .Status = backend .HealthStatusError
179
+ res .Message = fmt .Errorf ("Failed to initialize datasource: %w" , status .Err ).Error ()
180
+ } else if "" == ds .dsInfo .ConfiguredFields .TimeField {
181
+ res .Status = backend .HealthStatusError
182
+ res .Message = fmt .Sprintf ("timefield is missing from index config \" %s\" " , ds .dsInfo .Database )
183
+ } else if "" == ds .dsInfo .ConfiguredFields .TimeOutputFormat {
184
+ res .Status = backend .HealthStatusError
185
+ res .Message = fmt .Sprintf ("timefield's output_format is missing from index config \" %s\" " , ds .dsInfo .Database )
186
+ }
187
+ qwlog .Debug (res .Message )
188
+
137
189
return res , nil
138
190
}
139
191
140
192
func (ds * QuickwitDatasource ) QueryData (ctx context.Context , req * backend.QueryDataRequest ) (* backend.QueryDataResponse , error ) {
193
+ // Ensure ds is initialized, we need timestamp infos
194
+ status := <- ds .dsInfo .ReadyStatus
195
+ if ! status .IsReady {
196
+ qwlog .Debug (fmt .Errorf ("Datasource initialization failed: %w" , status .Err ).Error ())
197
+ response := & backend.QueryDataResponse {
198
+ Responses : backend.Responses {},
199
+ }
200
+ response .Responses ["__qwQueryDataError" ] = backend .ErrDataResponse (backend .StatusInternal , "Datasource initialization failed" )
201
+ return response , nil
202
+ }
203
+
141
204
return queryData (ctx , req .Queries , & ds .dsInfo )
142
205
}
143
206
0 commit comments