@@ -3,7 +3,7 @@ use std::sync::Arc;
3
3
use anyhow:: Result ;
4
4
use azure_storage_blobs:: prelude:: { BlobServiceClient , ContainerClient } ;
5
5
use spin_core:: async_trait;
6
- use spin_factor_blobstore:: { Error , Container , ContainerManager } ;
6
+ use spin_factor_blobstore:: { Container , ContainerManager , Error } ;
7
7
8
8
pub mod auth;
9
9
mod incoming_data;
@@ -18,21 +18,25 @@ pub struct AzureContainerManager {
18
18
}
19
19
20
20
impl AzureContainerManager {
21
- pub fn new (
22
- auth_options : AzureBlobAuthOptions ,
23
- ) -> Result < Self > {
21
+ pub fn new ( auth_options : AzureBlobAuthOptions ) -> Result < Self > {
24
22
let ( account, credentials) = match auth_options {
25
- AzureBlobAuthOptions :: AccountKey ( config) => {
26
- ( config. account . clone ( ) , azure_storage:: StorageCredentials :: access_key ( & config. account , config. key . clone ( ) ) )
27
- } ,
23
+ AzureBlobAuthOptions :: AccountKey ( config) => (
24
+ config. account . clone ( ) ,
25
+ azure_storage:: StorageCredentials :: access_key ( & config. account , config. key . clone ( ) ) ,
26
+ ) ,
28
27
AzureBlobAuthOptions :: Environmental => {
29
28
let account = std:: env:: var ( "STORAGE_ACCOUNT" ) . expect ( "missing STORAGE_ACCOUNT" ) ;
30
- let access_key = std:: env:: var ( "STORAGE_ACCESS_KEY" ) . expect ( "missing STORAGE_ACCOUNT_KEY" ) ;
31
- ( account. clone ( ) , azure_storage:: StorageCredentials :: access_key ( account, access_key) )
32
- } ,
29
+ let access_key =
30
+ std:: env:: var ( "STORAGE_ACCESS_KEY" ) . expect ( "missing STORAGE_ACCOUNT_KEY" ) ;
31
+ (
32
+ account. clone ( ) ,
33
+ azure_storage:: StorageCredentials :: access_key ( account, access_key) ,
34
+ )
35
+ }
33
36
} ;
34
37
35
- let client = azure_storage_blobs:: prelude:: ClientBuilder :: new ( account, credentials) . blob_service_client ( ) ;
38
+ let client = azure_storage_blobs:: prelude:: ClientBuilder :: new ( account, credentials)
39
+ . blob_service_client ( ) ;
36
40
Ok ( Self { client } )
37
41
}
38
42
}
@@ -51,7 +55,10 @@ impl ContainerManager for AzureContainerManager {
51
55
}
52
56
53
57
fn summary ( & self , _store_name : & str ) -> Option < String > {
54
- Some ( format ! ( "Azure blob storage account {}" , self . client. account( ) ) )
58
+ Some ( format ! (
59
+ "Azure blob storage account {}" ,
60
+ self . client. account( )
61
+ ) )
55
62
}
56
63
}
57
64
@@ -101,17 +108,31 @@ impl Container for AzureContainer {
101
108
Ok ( self . client . blob_client ( name) . exists ( ) . await ?)
102
109
}
103
110
104
- async fn object_info ( & self , name : & str ) -> anyhow:: Result < spin_factor_blobstore:: ObjectMetadata > {
111
+ async fn object_info (
112
+ & self ,
113
+ name : & str ,
114
+ ) -> anyhow:: Result < spin_factor_blobstore:: ObjectMetadata > {
105
115
let response = self . client . blob_client ( name) . get_properties ( ) . await ?;
106
116
Ok ( spin_factor_blobstore:: ObjectMetadata {
107
117
name : name. to_string ( ) ,
108
118
container : self . client . container_name ( ) . to_string ( ) ,
109
- created_at : response. blob . properties . creation_time . unix_timestamp ( ) . try_into ( ) . unwrap ( ) ,
119
+ created_at : response
120
+ . blob
121
+ . properties
122
+ . creation_time
123
+ . unix_timestamp ( )
124
+ . try_into ( )
125
+ . unwrap ( ) ,
110
126
size : response. blob . properties . content_length ,
111
127
} )
112
128
}
113
129
114
- async fn get_data ( & self , name : & str , start : u64 , end : u64 ) -> anyhow:: Result < Box < dyn spin_factor_blobstore:: IncomingData > > {
130
+ async fn get_data (
131
+ & self ,
132
+ name : & str ,
133
+ start : u64 ,
134
+ end : u64 ,
135
+ ) -> anyhow:: Result < Box < dyn spin_factor_blobstore:: IncomingData > > {
115
136
// We can't use a Rust range because the Azure type does not accept inclusive ranges,
116
137
// and we don't want to add 1 to `end` if it's already at MAX!
117
138
let range = if end == u64:: MAX {
@@ -123,12 +144,20 @@ impl Container for AzureContainer {
123
144
Ok ( Box :: new ( AzureIncomingData :: new ( client, range) ) )
124
145
}
125
146
126
- async fn connect_stm ( & self , name : & str , stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , finished_tx : tokio:: sync:: mpsc:: Sender < anyhow:: Result < ( ) > > ) -> anyhow:: Result < ( ) > {
147
+ async fn connect_stm (
148
+ & self ,
149
+ name : & str ,
150
+ stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > ,
151
+ finished_tx : tokio:: sync:: mpsc:: Sender < anyhow:: Result < ( ) > > ,
152
+ ) -> anyhow:: Result < ( ) > {
127
153
let client = self . client . blob_client ( name) ;
128
154
129
155
tokio:: spawn ( async move {
130
156
let result = Self :: connect_stm_core ( stm, client) . await ;
131
- finished_tx. send ( result) . await . expect ( "should sent finish tx" ) ;
157
+ finished_tx
158
+ . send ( result)
159
+ . await
160
+ . expect ( "should sent finish tx" ) ;
132
161
} ) ;
133
162
134
163
Ok ( ( ) )
@@ -141,7 +170,10 @@ impl Container for AzureContainer {
141
170
}
142
171
143
172
impl AzureContainer {
144
- async fn connect_stm_core ( mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > , client : azure_storage_blobs:: prelude:: BlobClient ) -> anyhow:: Result < ( ) > {
173
+ async fn connect_stm_core (
174
+ mut stm : tokio:: io:: ReadHalf < tokio:: io:: SimplexStream > ,
175
+ client : azure_storage_blobs:: prelude:: BlobClient ,
176
+ ) -> anyhow:: Result < ( ) > {
145
177
use tokio:: io:: AsyncReadExt ;
146
178
147
179
// Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
@@ -162,22 +194,24 @@ impl AzureContainer {
162
194
let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
163
195
let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
164
196
client. put_block ( block_id. clone ( ) , bytes) . await ?;
165
- blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
197
+ blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted (
198
+ block_id,
199
+ ) ) ;
166
200
break ' put_blocks;
167
201
}
168
202
if len >= BLOCK_SIZE {
169
203
let id_bytes = uuid:: Uuid :: new_v4 ( ) . as_bytes ( ) . to_vec ( ) ;
170
204
let block_id = azure_storage_blobs:: prelude:: BlockId :: new ( id_bytes) ;
171
205
client. put_block ( block_id. clone ( ) , bytes) . await ?;
172
- blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted ( block_id) ) ;
206
+ blocks. push ( azure_storage_blobs:: blob:: BlobBlockType :: Uncommitted (
207
+ block_id,
208
+ ) ) ;
173
209
break ;
174
210
}
175
211
}
176
212
}
177
213
178
- let block_list = azure_storage_blobs:: blob:: BlockList {
179
- blocks
180
- } ;
214
+ let block_list = azure_storage_blobs:: blob:: BlockList { blocks } ;
181
215
client. put_block_list ( block_list) . await ?;
182
216
183
217
Ok ( ( ) )
0 commit comments