Skip to content

Commit a7d48c8

Browse files
committed
Solve issues #104 (pre-V9 Distributed monitoring) #105 (queue max depth)
1 parent efb5196 commit a7d48c8

File tree

4 files changed

+200
-22
lines changed

4 files changed

+200
-22
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# Changelog
22
Newest updates are at the top of this file.
33

4+
## May 31 2019
5+
* mqmetric - Allow limited monitoring of V8 Distributed platforms
6+
(set `ibmmq.usePublications` to *false* to enable in monitor programs) #104
7+
* mqmetric - Added queue_attribute_max_depth to permit %full calculation
8+
(set `ibmmq.useStatus` to *true* to enable in monitor programs) #105
9+
410
## April 23 2019
511
* Fixed memory leak in InqMP
612
* mqmetric - Added ability to set a timezone offset
@@ -9,7 +15,7 @@ Newest updates are at the top of this file.
915
## April 03 2019
1016
* mqmetric - Added last put/get time metric for queues
1117
* mqmetric - Added last msg time metric for channels
12-
* mqmetric - Added fields from QMSTATUS and TPSTATUS
18+
* mqmetric - Added fields from QMSTATUS and TPSTATUS
1319

1420
## April 1 2019
1521
* Added scripts to compile samples inside a container

mqmetric/discover.go

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,21 +80,30 @@ type AllMetrics struct {
8080
Classes map[int]*MonClass
8181
}
8282

83+
type QInfo struct {
84+
MaxDepth int64
85+
}
86+
8387
// QMgrMapKey can never be a real object name and is therefore useful in
8488
// maps that may contain only this single entry
8589
const QMgrMapKey = "@self"
8690

8791
const maxBufSize = 100 * 1024 * 1024 // 100 MB
92+
const defaultMaxQDepth = 5000
8893

8994
// Metrics is the global variable for the tree of data
9095
var Metrics AllMetrics
9196

92-
var qList []string
97+
var qInfoMap map[string]*QInfo
9398
var locale string
9499
var discoveryDone = false
95100

96101
func GetDiscoveredQueues() []string {
97-
return qList
102+
keys := make([]string, 0)
103+
for key := range qInfoMap {
104+
keys = append(keys, key)
105+
}
106+
return keys
98107
}
99108

100109
/*
@@ -107,7 +116,7 @@ func SetLocale(l string) {
107116

108117
/*
109118
* Check any important parameters - this must be called after DiscoverAndSubscribe
110-
* to maintain compatibility of the package's APIs. It also needs the qList to have been
119+
* to maintain compatibility of the package's APIs. It also needs the list of queues to have been
111120
* populated first which is also done in DiscoverAndSubscribe.
112121
* Returns: an MQ CompCode, error string. CompCode can be MQCC_OK, WARNING or ERROR.
113122
*/
@@ -133,8 +142,8 @@ func VerifyConfig() (int32, error) {
133142
// as MQ publications are at 10 second interval by default (and no public tuning)
134143
// and assume monitor collection interval is one minute
135144
// Since we don't do pubsub-based collection on z/OS, this qdepth doesn't matter
136-
recommendedDepth := (20 + len(qList)*5) * 6
137-
if maxQDepth < int32(recommendedDepth) && platform != ibmmq.MQPL_ZOS {
145+
recommendedDepth := (20 + len(qInfoMap)*5) * 6
146+
if maxQDepth < int32(recommendedDepth) && usePublications {
138147
err = fmt.Errorf("Warning: Maximum queue depth on %s may be too low. Current value = %d", replyQBaseName, maxQDepth)
139148
compCode = ibmmq.MQCC_WARNING
140149
}
@@ -161,6 +170,7 @@ func DiscoverAndSubscribe(queueList string, checkQueueList bool, metaPrefix stri
161170
var err error
162171

163172
discoveryDone = true
173+
qInfoMap = make(map[string]*QInfo)
164174

165175
// What metrics can the queue manager provide?
166176
if err == nil {
@@ -172,14 +182,15 @@ func DiscoverAndSubscribe(queueList string, checkQueueList bool, metaPrefix stri
172182
if err == nil {
173183
if checkQueueList {
174184
err = discoverQueues(queueList)
185+
//fmt.Printf("Queue Set size = %d\n",len(qInfoMap))
175186
} else {
176-
qList = strings.Split(queueList, ",")
187+
qList := strings.Split(queueList, ",")
188+
// Make sure the names are reasonably valid
189+
for i := 0; i < len(qList); i++ {
190+
qInfoMap[strings.TrimSpace(qList[i])] = new(QInfo)
191+
}
177192
}
178193

179-
// Make sure the names are reasonably valid
180-
for i := 0; i < len(qList); i++ {
181-
qList[i] = strings.TrimSpace(qList[i])
182-
}
183194
}
184195

185196
// Subscribe to all of the various topics
@@ -426,7 +437,7 @@ func discoverStats(metaPrefix string) error {
426437
Metrics.Classes = make(map[int]*MonClass)
427438

428439
// Allow us to proceed on z/OS even though it does not support pub/sub resources
429-
if metaPrefix == "" && platform == ibmmq.MQPL_ZOS {
440+
if metaPrefix == "" && !usePublications {
430441
return nil
431442
}
432443

@@ -488,11 +499,24 @@ and then use a more general regexp match. Something for a later update perhaps.
488499
*/
489500
func discoverQueues(monitoredQueues string) error {
490501
var err error
502+
var qList []string
503+
491504
qList, err = inquireObjects(monitoredQueues, ibmmq.MQOT_Q)
492505
if len(qList) > 0 {
493506
//fmt.Printf("Monitoring Queues: %v\n", qList)
507+
for i := 0; i < len(qList); i++ {
508+
qName := strings.TrimSpace(qList[i])
509+
qInfoElem := new(QInfo)
510+
qInfoElem.MaxDepth = defaultMaxQDepth
511+
qInfoMap[qName] = qInfoElem
512+
}
513+
514+
if useStatus {
515+
inquireQueueAttributes(monitoredQueues)
516+
}
517+
494518
if err != nil {
495-
// fmt.Printf("Queue Discovery: %v\n", err)
519+
//fmt.Printf("Queue Discovery Error: %v\n", err)
496520
}
497521
return nil
498522
}
@@ -669,13 +693,13 @@ func createSubscriptions() error {
669693
for _, ty := range cl.Types {
670694

671695
if strings.Contains(ty.ObjectTopic, "%s") {
672-
for i := 0; i < len(qList); i++ {
673-
if len(qList[i]) == 0 {
696+
for key, _ := range qInfoMap {
697+
if len(key) == 0 {
674698
continue
675699
}
676-
topic := fmt.Sprintf(ty.ObjectTopic, qList[i])
700+
topic := fmt.Sprintf(ty.ObjectTopic, key)
677701
sub, err = subscribe(topic, &replyQObj)
678-
ty.subHobj[qList[i]] = sub
702+
ty.subHobj[key] = sub
679703
}
680704
} else {
681705
sub, err = subscribe(ty.ObjectTopic, &replyQObj)
@@ -711,7 +735,7 @@ func ProcessPublications() error {
711735
var elementidx int
712736
var value int64
713737

714-
if platform == ibmmq.MQPL_ZOS {
738+
if !usePublications {
715739
return nil
716740
}
717741

mqmetric/mqif.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,19 @@ var (
4646
qmgrConnected = false
4747
queuesOpened = false
4848
subsOpened = false
49+
50+
usePublications = true
51+
useStatus = false
4952
)
5053

5154
type ConnectionConfig struct {
5255
ClientMode bool
5356
UserId string
5457
Password string
5558
TZOffsetSecs float64
59+
60+
UsePublications bool
61+
UseStatus bool
5662
}
5763

5864
/*
@@ -98,6 +104,8 @@ func InitConnection(qMgrName string, replyQ string, cc *ConnectionConfig) error
98104
var qMgrObject ibmmq.MQObject
99105
var v map[int32]interface{}
100106

107+
useStatus = cc.UseStatus
108+
101109
mqod := ibmmq.NewMQOD()
102110
openOptions := ibmmq.MQOO_INQUIRE + ibmmq.MQOO_FAIL_IF_QUIESCING
103111

@@ -116,10 +124,21 @@ func InitConnection(qMgrName string, replyQ string, cc *ConnectionConfig) error
116124
resolvedQMgrName = v[ibmmq.MQCA_Q_MGR_NAME].(string)
117125
platform = v[ibmmq.MQIA_PLATFORM].(int32)
118126
commandLevel = v[ibmmq.MQIA_COMMAND_LEVEL].(int32)
119-
if commandLevel < 900 && platform != ibmmq.MQPL_ZOS && platform != ibmmq.MQPL_APPLIANCE {
120-
err = fmt.Errorf("Queue manager must be at least V9.0 for monitoring.")
121-
errorString = "Unsupported system"
127+
if platform == ibmmq.MQPL_ZOS {
128+
usePublications = false
129+
} else {
130+
if cc.UsePublications == true {
131+
if commandLevel < 900 && platform != ibmmq.MQPL_APPLIANCE {
132+
err = fmt.Errorf("Queue manager must be at least V9.0 for full monitoring. The ibmmq.usePublications configuration parameter can be used to permit limited monitoring.")
133+
errorString = "Unsupported system"
134+
} else {
135+
usePublications = cc.UsePublications
136+
}
137+
} else {
138+
usePublications = false
139+
}
122140
}
141+
123142
}
124143
// Don't need the qMgrObject any more
125144
qMgrObject.Close(0)

mqmetric/queue.go

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const (
4545
ATTR_Q_DEPTH = "depth"
4646
ATTR_Q_SINCE_PUT = "time_since_put"
4747
ATTR_Q_SINCE_GET = "time_since_get"
48+
ATTR_Q_MAX_DEPTH = "attribute_max_depth"
4849
)
4950

5051
var QueueStatus StatusSet
@@ -79,12 +80,21 @@ func QueueInitAttributes() {
7980
QueueStatus.Attributes[attr] = newStatusAttribute(attr, "Input Handles", ibmmq.MQIA_OPEN_INPUT_COUNT)
8081
attr = ATTR_Q_OPPROCS
8182
QueueStatus.Attributes[attr] = newStatusAttribute(attr, "Input Handles", ibmmq.MQIA_OPEN_OUTPUT_COUNT)
83+
8284
// Usually we get the QDepth from published resources, But on z/OS we can get it from the QSTATUS response
83-
if platform == ibmmq.MQPL_ZOS {
85+
if !usePublications {
8486
attr = ATTR_Q_DEPTH
8587
QueueStatus.Attributes[attr] = newStatusAttribute(attr, "Queue Depth", ibmmq.MQIA_CURRENT_Q_DEPTH)
8688
}
8789

90+
// This is not really a momitoring metric but it enables calculations to be made such as %full for
91+
// the queue. It's extracted at startup of the program via INQUIRE_Q and not updated later even if the
92+
// queue definition is changed. It's not easy to generate the % value in this program as the CurDepth will
93+
// usually - but not always - come from the published resource stats. So we don't have direct access to it.
94+
// Recording the MaxDepth allows Prometheus etc to do the calculation regardless of how the CurDepth was obtained.
95+
attr = ATTR_Q_MAX_DEPTH
96+
QueueStatus.Attributes[attr] = newStatusAttribute(attr, "Queue Max Depth", -1)
97+
8898
attr = ATTR_Q_QTIME_SHORT
8999
QueueStatus.Attributes[attr] = newStatusAttribute(attr, "Queue Time Short", ibmmq.MQIACF_Q_TIME_INDICATOR)
90100
QueueStatus.Attributes[attr].index = 0
@@ -178,6 +188,66 @@ func collectQueueStatus(pattern string, instanceType int32) error {
178188
return err
179189
}
180190

191+
// Issue the INQUIRE_Q call for wildcarded queue names and
192+
// extract the required attributes - currently, just the
193+
// Maximum Queue Depth
194+
func inquireQueueAttributes(objectPatternsList string) error {
195+
var err error
196+
197+
statusClearReplyQ()
198+
199+
if objectPatternsList == "" {
200+
return err
201+
}
202+
203+
objectPatterns := strings.Split(strings.TrimSpace(objectPatternsList), ",")
204+
for i := 0; i < len(objectPatterns) && err == nil; i++ {
205+
var buf []byte
206+
pattern := strings.TrimSpace(objectPatterns[i])
207+
if len(pattern) == 0 {
208+
continue
209+
}
210+
211+
putmqmd, pmo, cfh, buf := statusSetCommandHeaders()
212+
213+
// Can allow all the other fields to default
214+
cfh.Command = ibmmq.MQCMD_INQUIRE_Q
215+
216+
// Add the parameters one at a time into a buffer
217+
pcfparm := new(ibmmq.PCFParameter)
218+
pcfparm.Type = ibmmq.MQCFT_STRING
219+
pcfparm.Parameter = ibmmq.MQCA_Q_NAME
220+
pcfparm.String = []string{pattern}
221+
cfh.ParameterCount++
222+
buf = append(buf, pcfparm.Bytes()...)
223+
224+
pcfparm = new(ibmmq.PCFParameter)
225+
pcfparm.Type = ibmmq.MQCFT_INTEGER_LIST
226+
pcfparm.Parameter = ibmmq.MQIACF_Q_ATTRS
227+
pcfparm.Int64Value = []int64{int64(ibmmq.MQIA_MAX_Q_DEPTH)}
228+
cfh.ParameterCount++
229+
buf = append(buf, pcfparm.Bytes()...)
230+
231+
// Once we know the total number of parameters, put the
232+
// CFH header on the front of the buffer.
233+
buf = append(cfh.Bytes(), buf...)
234+
235+
// And now put the command to the queue
236+
err = cmdQObj.Put(putmqmd, pmo, buf)
237+
if err != nil {
238+
return err
239+
}
240+
241+
for allReceived := false; !allReceived; {
242+
cfh, buf, allReceived, err = statusGetReply()
243+
if buf != nil {
244+
parseQAttrData(cfh, buf)
245+
}
246+
}
247+
}
248+
return nil
249+
}
250+
181251
// Given a PCF response message, parse it to extract the desired statistics
182252
func parseQData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
183253
var elem *ibmmq.PCFParameter
@@ -248,9 +318,68 @@ func parseQData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
248318
QueueStatus.Attributes[ATTR_Q_SINCE_PUT].Values[key] = newStatusValueInt64(statusTimeDiff(now, lastPutDate, lastPutTime))
249319
QueueStatus.Attributes[ATTR_Q_SINCE_GET].Values[key] = newStatusValueInt64(statusTimeDiff(now, lastGetDate, lastGetTime))
250320

321+
if s, ok := qInfoMap[key]; ok {
322+
maxDepth := s.MaxDepth
323+
QueueStatus.Attributes[ATTR_Q_MAX_DEPTH].Values[key] = newStatusValueInt64(maxDepth)
324+
}
251325
return key
252326
}
253327

328+
func parseQAttrData(cfh *ibmmq.MQCFH, buf []byte) {
329+
var elem *ibmmq.PCFParameter
330+
331+
qName := ""
332+
333+
parmAvail := true
334+
bytesRead := 0
335+
offset := 0
336+
datalen := len(buf)
337+
if cfh.ParameterCount == 0 {
338+
return
339+
}
340+
341+
// Parse it once to extract the fields that are needed for the map key
342+
for parmAvail && cfh.CompCode != ibmmq.MQCC_FAILED {
343+
elem, bytesRead = ibmmq.ReadPCFParameter(buf[offset:])
344+
offset += bytesRead
345+
// Have we now reached the end of the message
346+
if offset >= datalen {
347+
parmAvail = false
348+
}
349+
350+
// Only one field needed for queues
351+
switch elem.Parameter {
352+
case ibmmq.MQCA_Q_NAME:
353+
qName = strings.TrimSpace(elem.String[0])
354+
}
355+
}
356+
357+
// And then re-parse the message so we can store the metrics now knowing the map key
358+
parmAvail = true
359+
offset = 0
360+
for parmAvail && cfh.CompCode != ibmmq.MQCC_FAILED {
361+
elem, bytesRead = ibmmq.ReadPCFParameter(buf[offset:])
362+
offset += bytesRead
363+
// Have we now reached the end of the message
364+
if offset >= datalen {
365+
parmAvail = false
366+
}
367+
368+
switch elem.Parameter {
369+
case ibmmq.MQIA_MAX_Q_DEPTH:
370+
v := elem.Int64Value[0]
371+
if v > 0 {
372+
if qInfo, ok := qInfoMap[qName]; ok {
373+
qInfo.MaxDepth = v
374+
}
375+
}
376+
}
377+
378+
}
379+
380+
return
381+
}
382+
254383
// Return a standardised value.
255384
func QueueNormalise(attr *StatusAttribute, v int64) float64 {
256385
return statusNormalise(attr, v)

0 commit comments

Comments
 (0)