@@ -183,7 +183,8 @@ static void ngx_stream_upsync_recv_handler(ngx_event_t *event);
183
183
static void ngx_stream_upsync_send_handler (ngx_event_t * event );
184
184
static void ngx_stream_upsync_timeout_handler (ngx_event_t * event );
185
185
static void ngx_stream_upsync_clean_event (void * upsync_server );
186
- static ngx_int_t ngx_stream_upsync_parse_init (void * upsync_server );
186
+ static ngx_int_t ngx_stream_upsync_etcd_parse_init (void * upsync_server );
187
+ static ngx_int_t ngx_stream_upsync_consul_parse_init (void * upsync_server );
187
188
static ngx_int_t ngx_stream_upsync_dump_server (
188
189
ngx_stream_upsync_server_t * upsync_server );
189
190
static ngx_int_t ngx_stream_upsync_init_server (ngx_event_t * event );
@@ -204,7 +205,6 @@ static void ngx_stream_upsync_event_init(ngx_stream_upstream_rr_peer_t *peer,
204
205
ngx_stream_upsync_server_t * upsync_server );
205
206
206
207
static ngx_int_t ngx_stream_http_parser_init ();
207
- static void ngx_stream_http_parser_execute (ngx_stream_upsync_ctx_t * ctx );
208
208
209
209
static int ngx_stream_http_status (http_parser * p , const char * buf , size_t len );
210
210
static int ngx_stream_http_header_field_cb (http_parser * p , const char * buf ,
@@ -329,15 +329,15 @@ static ngx_upsync_conf_t ngx_upsync_types[] = {
329
329
NGX_STREAM_UPSYNC_CONSUL ,
330
330
ngx_stream_upsync_send_handler ,
331
331
ngx_stream_upsync_recv_handler ,
332
- ngx_stream_upsync_parse_init ,
332
+ ngx_stream_upsync_consul_parse_init ,
333
333
ngx_stream_upsync_consul_parse_json ,
334
334
ngx_stream_upsync_clean_event },
335
335
336
336
{ ngx_string ("etcd" ),
337
337
NGX_STREAM_UPSYNC_ETCD ,
338
338
ngx_stream_upsync_send_handler ,
339
339
ngx_stream_upsync_recv_handler ,
340
- ngx_stream_upsync_parse_init ,
340
+ ngx_stream_upsync_etcd_parse_init ,
341
341
ngx_stream_upsync_etcd_parse_json ,
342
342
ngx_stream_upsync_clean_event },
343
343
@@ -1347,6 +1347,18 @@ ngx_stream_upsync_etcd_parse_json(void *data)
1347
1347
"upsync_parse_json: root error" );
1348
1348
return NGX_ERROR ;
1349
1349
}
1350
+
1351
+ cJSON * errorCode = cJSON_GetObjectItem (root , "errorCode" );
1352
+
1353
+ if (errorCode != NULL ) {
1354
+ if (errorCode -> valueint == 401 ) { // trigger reload, we've gone too far with index
1355
+ upsync_server -> index = 0 ;
1356
+ upsync_type_conf -> clean (upsync_server );
1357
+ ngx_add_timer (& upsync_server -> upsync_ev , 0 );
1358
+ }
1359
+ cJSON_Delete (root );
1360
+ return NGX_ERROR ;
1361
+ }
1350
1362
1351
1363
cJSON * action = cJSON_GetObjectItem (root , "action" );
1352
1364
if (action != NULL ) {
@@ -2470,9 +2482,9 @@ ngx_stream_upsync_send_handler(ngx_event_t *event)
2470
2482
2471
2483
if (upsync_type_conf -> upsync_type == NGX_STREAM_UPSYNC_ETCD ) {
2472
2484
if (upsync_server -> index != 0 ) {
2473
- ngx_sprintf (request , "GET %V?wait=true&recursive=true&waitIndex=%d"
2485
+ ngx_sprintf (request , "GET %V?wait=true&recursive=true&waitIndex=%d"
2474
2486
" HTTP/1.0\r\nHost: %V\r\nAccept: */*\r\n\r\n" ,
2475
- & upscf -> upsync_send , upsync_server -> index ,
2487
+ & upscf -> upsync_send , upsync_server -> index ,
2476
2488
& upscf -> conf_server .name );
2477
2489
2478
2490
} else {
@@ -2637,33 +2649,100 @@ ngx_stream_upsync_recv_handler(ngx_event_t *event)
2637
2649
2638
2650
2639
2651
static ngx_int_t
2640
- ngx_stream_upsync_parse_init (void * data )
2652
+ ngx_stream_upsync_consul_parse_init (void * data )
2641
2653
{
2642
- ngx_upsync_conf_t * upsync_type_conf ;
2654
+ char * buf ;
2655
+ size_t parsed ;
2643
2656
ngx_stream_upsync_ctx_t * ctx ;
2644
2657
ngx_stream_upsync_server_t * upsync_server = data ;
2645
2658
2646
- upsync_type_conf = upsync_server -> upscf -> upsync_type_conf ;
2647
2659
ctx = & upsync_server -> ctx ;
2648
2660
2649
- if (upsync_type_conf -> upsync_type == NGX_STREAM_UPSYNC_CONSUL
2650
- || upsync_type_conf -> upsync_type == NGX_STREAM_UPSYNC_ETCD )
2651
- {
2652
- if (ngx_stream_http_parser_init () == NGX_ERROR ) {
2653
- return NGX_ERROR ;
2661
+ if (ngx_stream_http_parser_init () == NGX_ERROR ) {
2662
+ return NGX_ERROR ;
2663
+ }
2664
+
2665
+ buf = (char * )ctx -> recv .pos ;
2666
+
2667
+ ctx -> body .pos = ctx -> body .last = NULL ;
2668
+
2669
+ parsed = http_parser_execute (parser , & settings , buf , ngx_strlen (buf ));
2670
+ if (parsed != ngx_strlen (buf )) {
2671
+ ngx_log_error (NGX_LOG_ERR , ngx_cycle -> log , 0 ,
2672
+ "upsync_consul_parse_init: parsed body size is wrong" );
2673
+ return NGX_ERROR ;
2674
+ }
2675
+
2676
+ if (ngx_strncmp (state .status , "OK" , 2 ) == 0 ) {
2677
+
2678
+ if (ngx_strlen (state .http_body ) != 0 ) {
2679
+ ctx -> body .pos = state .http_body ;
2680
+ ctx -> body .last = state .http_body + ngx_strlen (state .http_body );
2681
+
2654
2682
}
2683
+ }
2655
2684
2656
- ngx_stream_http_parser_execute (ctx );
2657
- if (ctx -> body .pos != ctx -> body .last ) {
2658
- * (ctx -> body .last + 1 ) = '\0' ;
2685
+ if (parser != NULL ) {
2686
+ ngx_free (parser );
2687
+ parser = NULL ;
2688
+ }
2689
+
2690
+ if (ctx -> body .pos != ctx -> body .last ) {
2691
+ * (ctx -> body .last + 1 ) = '\0' ;
2692
+
2693
+ } else {
2694
+ return NGX_ERROR ;
2695
+ }
2696
+
2697
+ return NGX_OK ;
2698
+ }
2699
+
2700
+
2701
+ static ngx_int_t
2702
+ ngx_stream_upsync_etcd_parse_init (void * data )
2703
+ {
2704
+ char * buf ;
2705
+ size_t parsed ;
2706
+ ngx_stream_upsync_ctx_t * ctx ;
2707
+ ngx_stream_upsync_server_t * upsync_server = data ;
2708
+
2709
+ ctx = & upsync_server -> ctx ;
2710
+
2711
+ if (ngx_stream_http_parser_init () == NGX_ERROR ) {
2712
+ return NGX_ERROR ;
2713
+ }
2714
+
2715
+ buf = (char * )ctx -> recv .pos ;
2716
+
2717
+ ctx -> body .pos = ctx -> body .last = NULL ;
2718
+
2719
+ parsed = http_parser_execute (parser , & settings , buf , ngx_strlen (buf ));
2720
+ if (parsed != ngx_strlen (buf )) {
2721
+ ngx_log_error (NGX_LOG_ERR , ngx_cycle -> log , 0 ,
2722
+ "upsync_etcd_parse_init: parsed body size is wrong" );
2723
+ return NGX_ERROR ;
2724
+ }
2725
+
2726
+ if (ngx_strncmp (state .status , "OK" , 2 ) == 0
2727
+ || ngx_strncmp (state .status , "Bad" , 3 ) == 0 ) {
2728
+
2729
+ if (ngx_strlen (state .http_body ) != 0 ) {
2730
+ ctx -> body .pos = state .http_body ;
2731
+ ctx -> body .last = state .http_body + ngx_strlen (state .http_body );
2659
2732
2660
- } else {
2661
- return NGX_ERROR ;
2662
2733
}
2734
+ }
2735
+
2736
+ if (parser != NULL ) {
2737
+ ngx_free (parser );
2738
+ parser = NULL ;
2739
+ }
2740
+
2741
+ if (ctx -> body .pos != ctx -> body .last ) {
2742
+ * (ctx -> body .last + 1 ) = '\0' ;
2663
2743
2664
2744
} else {
2665
- ctx -> body .pos = ctx -> recv .pos ;
2666
- ctx -> body .last = ctx -> recv .last ;
2745
+ return NGX_ERROR ;
2667
2746
}
2668
2747
2669
2748
return NGX_OK ;
@@ -3032,41 +3111,6 @@ ngx_stream_http_parser_init()
3032
3111
}
3033
3112
3034
3113
3035
- static void
3036
- ngx_stream_http_parser_execute (ngx_stream_upsync_ctx_t * ctx )
3037
- {
3038
- char * buf ;
3039
- size_t parsed ;
3040
-
3041
- buf = (char * )ctx -> recv .pos ;
3042
-
3043
- ctx -> body .pos = ctx -> body .last = NULL ;
3044
-
3045
- parsed = http_parser_execute (parser , & settings , buf , ngx_strlen (buf ));
3046
- if (parsed != ngx_strlen (buf )) {
3047
- ngx_log_error (NGX_LOG_ERR , ngx_cycle -> log , 0 ,
3048
- "http_parser_execute: parsed body size is wrong" );
3049
- return ;
3050
- }
3051
-
3052
- if (ngx_strncmp (state .status , "OK" , 2 ) == 0 ) {
3053
-
3054
- if (ngx_strlen (state .http_body ) != 0 ) {
3055
- ctx -> body .pos = state .http_body ;
3056
- ctx -> body .last = state .http_body + ngx_strlen (state .http_body );
3057
-
3058
- } else if (ngx_strlen (state .http_body ) == 0 ) {
3059
- return ;
3060
- }
3061
- }
3062
-
3063
- ngx_free (parser );
3064
- parser = NULL ;
3065
-
3066
- return ;
3067
- }
3068
-
3069
-
3070
3114
static int
3071
3115
ngx_stream_http_status (http_parser * p , const char * buf , size_t len )
3072
3116
{
0 commit comments