@@ -104,6 +104,9 @@ use nexus_db_model::Volume;
104
104
use nexus_db_model:: VolumeRepair ;
105
105
use nexus_db_model:: VolumeResourceUsage ;
106
106
use nexus_db_model:: VpcSubnet ;
107
+ use nexus_db_model:: WebhookDelivery ;
108
+ use nexus_db_model:: WebhookEventClass ;
109
+ use nexus_db_model:: WebhookReceiver ;
107
110
use nexus_db_model:: Zpool ;
108
111
use nexus_db_model:: to_db_typed_uuid;
109
112
use nexus_db_queries:: context:: OpContext ;
@@ -154,6 +157,7 @@ use omicron_uuid_kinds::PhysicalDiskUuid;
154
157
use omicron_uuid_kinds:: PropolisUuid ;
155
158
use omicron_uuid_kinds:: SledUuid ;
156
159
use omicron_uuid_kinds:: VolumeUuid ;
160
+ use omicron_uuid_kinds:: WebhookEventUuid ;
157
161
use omicron_uuid_kinds:: ZpoolUuid ;
158
162
use sled_agent_client:: VolumeConstructionRequest ;
159
163
use std:: borrow:: Cow ;
@@ -1162,14 +1166,19 @@ struct WebhookArgs {
1162
1166
1163
1167
#[ derive( Debug , Subcommand , Clone ) ]
1164
1168
enum WebhookCommands {
1165
- /// Get information on webhook receivers.
1169
+ /// Get information on webhook receivers
1166
1170
#[ clap( alias = "rx" ) ]
1167
1171
Receiver {
1168
1172
#[ command( subcommand) ]
1169
1173
command : WebhookRxCommands ,
1170
1174
} ,
1171
1175
/// Get information on webhook events
1172
1176
Event ,
1177
+ /// Get information on webhook delivieries
1178
+ Delivery {
1179
+ #[ command( subcommand) ]
1180
+ command : WebhookDeliveryCommands ,
1181
+ } ,
1173
1182
}
1174
1183
1175
1184
#[ derive( Debug , Subcommand , Clone ) ]
@@ -1189,10 +1198,44 @@ struct WebhookRxInfoArgs {
1189
1198
1190
1199
#[ derive( Debug , Args , Clone ) ]
1191
1200
struct WebhookRxListArgs {
1192
- #[ clap( long, short) ]
1201
+ #[ clap( long, short = 'a' ) ]
1193
1202
start_at : Option < Uuid > ,
1194
1203
}
1195
1204
1205
+ #[ derive( Debug , Subcommand , Clone ) ]
1206
+ enum WebhookDeliveryCommands {
1207
+ /// List webhook deliveries
1208
+ #[ clap( alias = "ls" ) ]
1209
+ List ( WebhookDeliveryListArgs ) ,
1210
+ }
1211
+
1212
+ #[ derive( Debug , Args , Clone ) ]
1213
+ struct WebhookDeliveryListArgs {
1214
+ /// If present, show only deliveries to this receiver.
1215
+ #[ clap( long, short, alias = "rx" ) ]
1216
+ receiver : Option < NameOrId > ,
1217
+
1218
+ /// If present, select only deliveries for the given event.
1219
+ #[ clap( long, short) ]
1220
+ event : Option < WebhookEventUuid > ,
1221
+
1222
+ /// If present, select only deliveries in the provided state(s)
1223
+ #[ clap( long = "state" , short) ]
1224
+ states : Vec < db:: model:: WebhookDeliveryState > ,
1225
+
1226
+ /// If present, select only deliveries with the provided trigger(s)
1227
+ #[ clap( long = "trigger" , short) ]
1228
+ triggers : Vec < db:: model:: WebhookDeliveryTrigger > ,
1229
+
1230
+ /// Include only delivery entries created before this timestamp
1231
+ #[ clap( long, short) ]
1232
+ before : Option < DateTime < Utc > > ,
1233
+
1234
+ /// Include only delivery entries created after this timestamp
1235
+ #[ clap( long, short) ]
1236
+ after : Option < DateTime < Utc > > ,
1237
+ }
1238
+
1196
1239
impl DbArgs {
1197
1240
/// Run a `omdb db` subcommand.
1198
1241
///
@@ -8084,6 +8127,9 @@ async fn cmd_db_webhook(
8084
8127
WebhookCommands :: Receiver {
8085
8128
command : WebhookRxCommands :: Info ( args) ,
8086
8129
} => cmd_db_webhook_rx_info ( datastore, fetch_opts, args) . await ,
8130
+ WebhookCommands :: Delivery {
8131
+ command : WebhookDeliveryCommands :: List ( args) ,
8132
+ } => cmd_db_webhook_delivery_list ( datastore, fetch_opts, args) . await ,
8087
8133
WebhookCommands :: Event => {
8088
8134
Err ( anyhow:: anyhow!( "not yet implemented, sorry!" ) )
8089
8135
}
@@ -8156,33 +8202,16 @@ async fn cmd_db_webhook_rx_info(
8156
8202
fetch_opts : & DbFetchOptions ,
8157
8203
args : & WebhookRxInfoArgs ,
8158
8204
) -> anyhow:: Result < ( ) > {
8159
- use nexus_db_schema:: schema:: webhook_receiver:: dsl as rx_dsl;
8160
8205
use nexus_db_schema:: schema:: webhook_rx_event_glob:: dsl as glob_dsl;
8161
8206
use nexus_db_schema:: schema:: webhook_rx_subscription:: dsl as subscription_dsl;
8162
8207
use nexus_db_schema:: schema:: webhook_secret:: dsl as secret_dsl;
8163
8208
8164
8209
let conn = datastore. pool_connection_for_tests ( ) . await ?;
8165
- let mut query = match args. receiver {
8166
- NameOrId :: Id ( id) => {
8167
- rx_dsl:: webhook_receiver. filter ( rx_dsl:: id. eq ( id) ) . into_boxed ( )
8168
- }
8169
- NameOrId :: Name ( ref name) => rx_dsl:: webhook_receiver
8170
- . filter ( rx_dsl:: name. eq ( name. to_string ( ) ) )
8171
- . into_boxed ( ) ,
8172
- } ;
8173
- if !fetch_opts. include_deleted {
8174
- query = query. filter ( rx_dsl:: time_deleted. is_null ( ) ) ;
8175
- }
8176
-
8177
- let rx = query
8178
- . limit ( 1 )
8179
- . select ( db:: model:: WebhookReceiver :: as_select ( ) )
8180
- . get_result_async ( & * conn)
8210
+ let rx = lookup_webhook_rx ( datastore, & args. receiver )
8181
8211
. await
8182
- . optional ( )
8183
8212
. with_context ( || format ! ( "loading webhook receiver {}" , args. receiver) ) ?
8184
8213
. ok_or_else ( || {
8185
- anyhow:: anyhow!( "no instance {} exists" , args. receiver)
8214
+ anyhow:: anyhow!( "no webhook receiver {} exists" , args. receiver)
8186
8215
} ) ?;
8187
8216
8188
8217
const ID : & ' static str = "ID" ;
@@ -8217,9 +8246,9 @@ async fn cmd_db_webhook_rx_info(
8217
8246
GLOB_EXACT ,
8218
8247
] ) ;
8219
8248
8220
- let db :: model :: WebhookReceiver {
8249
+ let WebhookReceiver {
8221
8250
identity :
8222
- db :: model :: WebhookReceiverIdentity {
8251
+ nexus_db_model :: WebhookReceiverIdentity {
8223
8252
id,
8224
8253
name,
8225
8254
description,
@@ -8305,7 +8334,7 @@ async fn cmd_db_webhook_rx_info(
8305
8334
. filter ( subscription_dsl:: rx_id. eq ( id. into_untyped_uuid ( ) ) )
8306
8335
. filter ( subscription_dsl:: glob. is_null ( ) )
8307
8336
. select ( subscription_dsl:: event_class)
8308
- . load_async :: < db :: model :: WebhookEventClass > ( & * conn)
8337
+ . load_async :: < WebhookEventClass > ( & * conn)
8309
8338
. await ;
8310
8339
match exact {
8311
8340
Ok ( exact) => {
@@ -8349,7 +8378,7 @@ async fn cmd_db_webhook_rx_info(
8349
8378
. filter ( subscription_dsl:: rx_id. eq ( id. into_untyped_uuid ( ) ) )
8350
8379
. filter ( subscription_dsl:: glob. eq ( glob) )
8351
8380
. select ( subscription_dsl:: event_class)
8352
- . load_async :: < db :: model :: WebhookEventClass > ( & * conn)
8381
+ . load_async :: < WebhookEventClass > ( & * conn)
8353
8382
. await ;
8354
8383
match exact {
8355
8384
Ok ( exact) => {
@@ -8372,6 +8401,190 @@ async fn cmd_db_webhook_rx_info(
8372
8401
Ok ( ( ) )
8373
8402
}
8374
8403
8404
+ async fn cmd_db_webhook_delivery_list (
8405
+ datastore : & DataStore ,
8406
+ fetch_opts : & DbFetchOptions ,
8407
+ args : & WebhookDeliveryListArgs ,
8408
+ ) -> anyhow:: Result < ( ) > {
8409
+ use nexus_db_schema:: schema:: webhook_delivery:: dsl as delivery_dsl;
8410
+ let conn = datastore. pool_connection_for_tests ( ) . await ?;
8411
+ let mut query = delivery_dsl:: webhook_delivery
8412
+ . limit ( fetch_opts. fetch_limit . get ( ) . into ( ) )
8413
+ . order_by ( delivery_dsl:: time_created. desc ( ) )
8414
+ . into_boxed ( ) ;
8415
+
8416
+ if let ( Some ( before) , Some ( after) ) = ( args. before , args. after ) {
8417
+ anyhow:: ensure!(
8418
+ after < before,
8419
+ "if both after and before are included, after must be earlier than before"
8420
+ ) ;
8421
+ }
8422
+
8423
+ if let Some ( before) = args. before {
8424
+ query = query. filter ( delivery_dsl:: time_created. lt ( before) ) ;
8425
+ }
8426
+
8427
+ if let Some ( after) = args. before {
8428
+ query = query. filter ( delivery_dsl:: time_created. gt ( after) ) ;
8429
+ }
8430
+
8431
+ if let Some ( ref receiver) = args. receiver {
8432
+ let rx =
8433
+ lookup_webhook_rx ( datastore, receiver) . await ?. ok_or_else ( || {
8434
+ anyhow:: anyhow!( "no webhook receiver {receiver} found" )
8435
+ } ) ?;
8436
+ query = query. filter ( delivery_dsl:: rx_id. eq ( rx. identity . id ) ) ;
8437
+ }
8438
+
8439
+ if !args. states . is_empty ( ) {
8440
+ query = query. filter ( delivery_dsl:: state. eq_any ( args. states . clone ( ) ) ) ;
8441
+ }
8442
+
8443
+ if !args. triggers . is_empty ( ) {
8444
+ query = query
8445
+ . filter ( delivery_dsl:: triggered_by. eq_any ( args. triggers . clone ( ) ) ) ;
8446
+ }
8447
+
8448
+ let ctx = || "listing webhook receivers" ;
8449
+
8450
+ let deliveries = query
8451
+ . select ( WebhookDelivery :: as_select ( ) )
8452
+ . load_async ( & * conn)
8453
+ . await
8454
+ . with_context ( ctx) ?;
8455
+
8456
+ check_limit ( & deliveries, fetch_opts. fetch_limit , ctx) ;
8457
+
8458
+ #[ derive( Tabled ) ]
8459
+ struct DeliveryRow {
8460
+ id : Uuid ,
8461
+ trigger : nexus_db_model:: WebhookDeliveryTrigger ,
8462
+ state : nexus_db_model:: WebhookDeliveryState ,
8463
+ attempts : u8 ,
8464
+ #[ tabled( display_with = "datetime_rfc3339_concise" ) ]
8465
+ time_created : DateTime < Utc > ,
8466
+ #[ tabled( display_with = "datetime_opt_rfc3339_concise" ) ]
8467
+ time_completed : Option < DateTime < Utc > > ,
8468
+ }
8469
+
8470
+ #[ derive( Tabled ) ]
8471
+ struct WithEventId < T : Tabled > {
8472
+ #[ tabled( inline) ]
8473
+ inner : T ,
8474
+ event_id : Uuid ,
8475
+ }
8476
+
8477
+ #[ derive( Tabled ) ]
8478
+ struct WithRxId < T : Tabled > {
8479
+ #[ tabled( inline) ]
8480
+ inner : T ,
8481
+ receiver_id : Uuid ,
8482
+ }
8483
+
8484
+ impl From < & ' _ WebhookDelivery > for DeliveryRow {
8485
+ fn from ( d : & WebhookDelivery ) -> Self {
8486
+ let WebhookDelivery {
8487
+ id,
8488
+ // event and receiver UUIDs are toggled on and off based on
8489
+ // whether or not we are filtering by receiver and event, so
8490
+ // ignore them here.
8491
+ event_id : _,
8492
+ rx_id : _,
8493
+ attempts,
8494
+ state,
8495
+ time_created,
8496
+ time_completed,
8497
+ // ignore these as they are used for runtime coordination and
8498
+ // aren't very useful for showing delivery history
8499
+ deliverator_id : _,
8500
+ time_leased : _,
8501
+ triggered_by,
8502
+ } = d;
8503
+ Self {
8504
+ id : id. into_untyped_uuid ( ) ,
8505
+ trigger : * triggered_by,
8506
+ state : * state,
8507
+ attempts : attempts. 0 ,
8508
+ time_created : * time_created,
8509
+ time_completed : * time_completed,
8510
+ }
8511
+ }
8512
+ }
8513
+
8514
+ impl < ' d , T > From < & ' d WebhookDelivery > for WithEventId < T >
8515
+ where
8516
+ T : From < & ' d WebhookDelivery > + Tabled ,
8517
+ {
8518
+ fn from ( d : & ' d WebhookDelivery ) -> Self {
8519
+ Self { event_id : d. event_id . into_untyped_uuid ( ) , inner : T :: from ( d) }
8520
+ }
8521
+ }
8522
+
8523
+ impl < ' d , T > From < & ' d WebhookDelivery > for WithRxId < T >
8524
+ where
8525
+ T : From < & ' d WebhookDelivery > + Tabled ,
8526
+ {
8527
+ fn from ( d : & ' d WebhookDelivery ) -> Self {
8528
+ Self { receiver_id : d. rx_id . into_untyped_uuid ( ) , inner : T :: from ( d) }
8529
+ }
8530
+ }
8531
+
8532
+ let mut table = match ( args. receiver . as_ref ( ) , args. event ) {
8533
+ // Filtered by both receiver and event, so don't display either.
8534
+ ( Some ( _) , Some ( _) ) => {
8535
+ tabled:: Table :: new ( deliveries. iter ( ) . map ( DeliveryRow :: from) )
8536
+ }
8537
+ // Filtered by neither receiver nor event, so include both.
8538
+ ( None , None ) => tabled:: Table :: new (
8539
+ deliveries. iter ( ) . map ( WithRxId :: < WithEventId < DeliveryRow > > :: from) ,
8540
+ ) ,
8541
+ // Filtered by receiver ID only
8542
+ ( Some ( _) , None ) => tabled:: Table :: new (
8543
+ deliveries. iter ( ) . map ( WithEventId :: < DeliveryRow > :: from) ,
8544
+ ) ,
8545
+ // Filtered by event ID only
8546
+ ( None , Some ( _) ) => tabled:: Table :: new (
8547
+ deliveries. iter ( ) . map ( WithRxId :: < DeliveryRow > :: from) ,
8548
+ ) ,
8549
+ } ;
8550
+ table
8551
+ . with ( tabled:: settings:: Style :: empty ( ) )
8552
+ . with ( tabled:: settings:: Padding :: new ( 0 , 1 , 0 , 0 ) ) ;
8553
+
8554
+ println ! ( "{table}" ) ;
8555
+ Ok ( ( ) )
8556
+ }
8557
+
8558
+ /// Helper function to look up a webhook receiver with the given name or ID
8559
+ async fn lookup_webhook_rx (
8560
+ datastore : & DataStore ,
8561
+ name_or_id : & NameOrId ,
8562
+ ) -> anyhow:: Result < Option < WebhookReceiver > > {
8563
+ use nexus_db_schema:: schema:: webhook_receiver:: dsl;
8564
+
8565
+ let conn = datastore. pool_connection_for_tests ( ) . await ?;
8566
+ match name_or_id {
8567
+ NameOrId :: Id ( id) => {
8568
+ dsl:: webhook_receiver
8569
+ . filter ( dsl:: id. eq ( * id) )
8570
+ . limit ( 1 )
8571
+ . select ( WebhookReceiver :: as_select ( ) )
8572
+ . get_result_async ( & * conn)
8573
+ . await
8574
+ }
8575
+ NameOrId :: Name ( ref name) => {
8576
+ dsl:: webhook_receiver
8577
+ . filter ( dsl:: name. eq ( name. to_string ( ) ) )
8578
+ . limit ( 1 )
8579
+ . select ( WebhookReceiver :: as_select ( ) )
8580
+ . get_result_async ( & * conn)
8581
+ . await
8582
+ }
8583
+ }
8584
+ . optional ( )
8585
+ . with_context ( || format ! ( "loading webhook_receiver {name_or_id}" ) )
8586
+ }
8587
+
8375
8588
// Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using
8376
8589
// `Z` rather than the UTC offset for UTC timestamps, to save a few characters
8377
8590
// of line width in tabular output.
0 commit comments