28
28
)
29
29
from azure .mgmt .storage import StorageManagementClient
30
30
from azure .mgmt .storage .models import StorageQueue
31
+ from azure .core .exceptions import ResourceNotFoundError
31
32
32
33
33
34
class AzureAutoloaderResourcesUtility (UtilitiesInterface ):
@@ -43,7 +44,6 @@ class AzureAutoloaderResourcesUtility(UtilitiesInterface):
43
44
credential (TokenCredential): Credentials to authenticate with Storage Account
44
45
event_subscription_name (str): Name of the Event Subscription
45
46
queue_name (str): Name of the queue that will be used for the Endpoint of the Messages
46
- system_topic_name (optional, str): The system topic name. Defaults to the storage account name if not provided.
47
47
"""
48
48
49
49
subscription_id : str
@@ -54,7 +54,6 @@ class AzureAutoloaderResourcesUtility(UtilitiesInterface):
54
54
credential : TokenCredential
55
55
event_subscription_name : str
56
56
queue_name : str
57
- system_topic_name : str
58
57
59
58
def __init__ (
60
59
self ,
@@ -66,7 +65,6 @@ def __init__(
66
65
credential : TokenCredential ,
67
66
event_subscription_name : str ,
68
67
queue_name : str ,
69
- system_topic_name : str = None ,
70
68
) -> None :
71
69
self .subscription_id = subscription_id
72
70
self .resource_group_name = resource_group_name
@@ -76,9 +74,6 @@ def __init__(
76
74
self .credential = credential
77
75
self .event_subscription_name = event_subscription_name
78
76
self .queue_name = queue_name
79
- self .system_topic_name = (
80
- storage_account if system_topic_name is None else system_topic_name
81
- )
82
77
83
78
@staticmethod
84
79
def system_type ():
@@ -104,21 +99,16 @@ def execute(self) -> bool:
104
99
credential = self .credential , subscription_id = self .subscription_id
105
100
)
106
101
107
- account_properties = storage_mgmt_client .storage_accounts .get_properties (
108
- resource_group_name = self .resource_group_name ,
109
- account_name = self .storage_account ,
110
- )
111
-
112
- queue_response = storage_mgmt_client .queue .list (
113
- resource_group_name = self .resource_group_name ,
114
- account_name = self .storage_account ,
115
- )
116
-
117
- queue_list = [
118
- queue for queue in queue_response if queue .name == self .queue_name
119
- ]
102
+ try :
103
+ queue_response = storage_mgmt_client .queue .get (
104
+ resource_group_name = self .resource_group_name ,
105
+ account_name = self .storage_account ,
106
+ queue_name = self .queue_name ,
107
+ )
108
+ except ResourceNotFoundError :
109
+ queue_response = None
120
110
121
- if queue_list == [] :
111
+ if queue_response == None :
122
112
storage_mgmt_client .queue .create (
123
113
resource_group_name = self .resource_group_name ,
124
114
account_name = self .storage_account ,
@@ -130,47 +120,18 @@ def execute(self) -> bool:
130
120
credential = self .credential , subscription_id = self .subscription_id
131
121
)
132
122
133
- system_topic_response = eventgrid_client .system_topics .list_by_resource_group (
134
- resource_group_name = self .resource_group_name ,
135
- filter = "name eq '{}'" .format (self .system_topic_name ),
136
- )
137
-
138
123
source = "/subscriptions/{}/resourceGroups/{}/providers/Microsoft.Storage/StorageAccounts/{}" .format (
139
124
self .subscription_id , self .resource_group_name , self .storage_account
140
125
)
141
126
142
- system_topic_list = [
143
- system_topic
144
- for system_topic in system_topic_response
145
- if system_topic .source == source
146
- ]
147
-
148
- if system_topic_list == []:
149
- eventgrid_client .system_topics .begin_create_or_update (
150
- resource_group_name = self .resource_group_name ,
151
- system_topic_name = self .system_topic_name ,
152
- system_topic_info = SystemTopic (
153
- location = account_properties .location ,
154
- source = source ,
155
- topic_type = "Microsoft.Storage.StorageAccounts" ,
156
- ),
157
- ).result ()
158
-
159
- system_topic_event_subscription_response = (
160
- eventgrid_client .system_topic_event_subscriptions .list_by_system_topic (
161
- resource_group_name = self .resource_group_name ,
162
- system_topic_name = self .system_topic_name ,
163
- filter = "name eq '{}'" .format (self .event_subscription_name ),
127
+ try :
128
+ event_subscription_response = eventgrid_client .event_subscriptions .get (
129
+ scope = source , event_subscription_name = self .event_subscription_name
164
130
)
165
- )
166
-
167
- system_topic_event_subscription_list = [
168
- system_topic_event_subscription
169
- for system_topic_event_subscription in system_topic_event_subscription_response
170
- if system_topic_event_subscription .source == source
171
- ]
131
+ except ResourceNotFoundError :
132
+ event_subscription_response = None
172
133
173
- if system_topic_event_subscription_list == [] :
134
+ if event_subscription_response == None :
174
135
event_subscription_destination = StorageQueueEventSubscriptionDestination (
175
136
resource_id = source ,
176
137
queue_name = self .queue_name ,
@@ -210,10 +171,10 @@ def execute(self) -> bool:
210
171
retry_policy = retry_policy ,
211
172
)
212
173
213
- eventgrid_client .system_topic_event_subscriptions .begin_create_or_update (
214
- resource_group_name = self .resource_group_name ,
215
- system_topic_name = self .system_topic_name ,
174
+ eventgrid_client .event_subscriptions .begin_create_or_update (
175
+ scope = source ,
216
176
event_subscription_name = self .event_subscription_name ,
217
177
event_subscription_info = event_subscription_info ,
218
178
).result ()
179
+
219
180
return True
0 commit comments