1 module aws.s3; 2 3 import aws.aws; 4 import aws.credentials; 5 import aws.sigv4; 6 7 import std.typecons: Tuple, tuple; 8 import std.string : toLower; 9 10 enum StorageClass: string 11 { 12 STANDARD = "STANDARD", 13 REDUCED_REDUNDANCY = "REDUCED_REDUNDANCY", 14 GLACIER = "GLACIER" 15 } 16 17 struct BucketListResult 18 { 19 static struct S3Resource 20 { 21 static struct Owner 22 { 23 string id; 24 string displayName; 25 } 26 27 string key; 28 string lastModfied; 29 string etag; 30 ulong size; 31 Owner owner; 32 StorageClass storageClass; 33 } 34 35 string name; 36 string prefix; 37 string marker; 38 string nextMarker; 39 S3Resource[] resources; 40 string[] commonPrefixes; 41 uint maxKeys; 42 bool isTruncated; 43 } 44 45 // auto listFilesRecursive(S3 client, string path = null) 46 // { 47 // import std.algorithm : endsWith; 48 // import std.range : empty; 49 // if(!path.empty && !path.endsWith("/")) 50 // path ~= "/"; 51 // return S3Resources(client, null, path); 52 // } 53 54 // auto listFiles(S3 client, string path = null) 55 // { 56 // import std.algorithm : endsWith; 57 // import std.range : empty; 58 // if(!path.empty && !path.endsWith("/")) 59 // path ~= "/"; 60 // return S3Resources(client, "/", path); 61 // } 62 63 // auto listFolders(S3 client, string path = null) 64 // { 65 // import std.range : empty; 66 // import std.algorithm : endsWith; 67 // if(!path.empty && !path.endsWith("/")) 68 // path ~= "/"; 69 // return S3Prefixes(client, "/", path); 70 // } 71 72 // struct S3Resources 73 // { 74 // import std.range.primitives; 75 // mixin _S3Common; 76 77 // auto front() @property 78 // { 79 // assert(!empty); 80 // return res.resources.front; 81 // } 82 83 // auto empty() const @property 84 // { 85 // return res.resources.empty; 86 // } 87 88 // auto popFront() 89 // { 90 // assert(!empty); 91 // res.resources.popFront; 92 // if(empty && res.isTruncated) 93 // { 94 // next; 95 // } 96 // } 97 98 // } 99 100 // struct S3Prefixes 101 // { 102 // import std.range.primitives; 103 // mixin _S3Common; 104 105 // auto front() @property 106 // { 107 // assert(!empty); 108 // return res.commonPrefixes.front; 109 // } 110 111 // auto empty() const @property 112 // { 113 // return res.commonPrefixes.empty; 114 // } 115 116 // auto popFront() 117 // { 118 // assert(!empty); 119 // res.commonPrefixes.popFront; 120 // if(empty && res.isTruncated) 121 // { 122 // next; 123 // } 124 // } 125 // } 126 127 // private mixin template _S3Common() 128 // { 129 // private S3 client; 130 // private BucketListResult res; 131 // private string delimiter; 132 // private string prefix; 133 // private uint maxKeys; 134 135 // private void next() 136 // { 137 // res = client.list(delimiter, prefix, res.nextMarker, maxKeys); 138 // } 139 140 // @disable this(); 141 142 // this(S3 client, string delimiter, string prefix, uint maxKeys = 0) 143 // { 144 // this.client = client; 145 // this.delimiter = delimiter; 146 // this.prefix = prefix; 147 // this.maxKeys = maxKeys; 148 // next(); 149 // } 150 151 // auto save() @property 152 // { 153 // return this; 154 // } 155 // } 156 157 import arsd.dom; 158 159 class S3 : RESTClient 160 { 161 this(string endpoint, string region, AWSCredentialSource credsSource) nothrow @safe { 162 super(endpoint, region, "s3", credsSource); 163 } 164 165 auto createBucket(string bucket) { 166 string[string] headers; 167 string[] signedHeaders = null; 168 ubyte[] input = null; 169 auto resp = doUpload("PUT", bucket, null, headers, signedHeaders, input, 1024); 170 checkForError(resp); 171 } 172 173 auto list(string bucket, string delimiter = null, string prefix = null, string marker = null, uint maxKeys = 0) 174 { 175 assert(maxKeys <= 1000); 176 177 import std.stdio; 178 import std.conv; 179 180 string[string] headers; 181 string[string] queryParameters; 182 queryParameters["list-type"] = "2"; 183 if (bucket !is null && bucket != "") 184 queryParameters["encoding-type"] = "url"; 185 186 if (delimiter !is null) 187 queryParameters["delimiter"] = delimiter; 188 189 if (prefix !is null) 190 queryParameters["prefix"] = prefix; 191 else 192 queryParameters["prefix"] = ""; 193 194 if (marker !is null) 195 queryParameters["continuation-token"] = marker; 196 197 if (maxKeys) 198 queryParameters["max-keys"] = maxKeys.to!string; 199 200 auto resp = doRequest("GET", bucket~"/", queryParameters, headers); 201 checkForError(resp); 202 auto response = readXML(resp); 203 204 BucketListResult result; 205 result.name = response.querySelector("listbucketresult name").safeInnerText; 206 result.prefix = response.querySelector("listbucketresult prefix").safeInnerText; 207 result.marker = response.querySelector("listbucketresult marker").safeInnerText; 208 result.maxKeys = response.querySelector("listbucketresult maxkeys").safeInnerText.to!uint; 209 result.isTruncated = response.querySelector("listbucketresult istruncated").safeInnerText.toLower.to!bool; 210 211 if (result.isTruncated) 212 result.nextMarker = response.querySelector("listbucketresult nextcontinuationtoken").safeInnerText; 213 214 auto entries = response.querySelectorAll("listbucketresult contents"); 215 216 if (entries) { 217 result.resources.reserve = 1000; 218 foreach(node; entries) 219 { 220 BucketListResult.S3Resource entry; 221 BucketListResult.S3Resource.Owner owner; 222 223 entry.key = node.querySelector("key").safeInnerText; 224 entry.lastModfied = node.querySelector("lastModified").safeInnerText; 225 entry.etag = node.querySelector("etag").safeInnerText; 226 entry.size = node.querySelector("size").safeInnerText.to!ulong; 227 import std.conv; 228 entry.storageClass = node.querySelector("storageclass").safeInnerText.to!StorageClass; 229 230 result.resources.assumeSafeAppend ~= entry; 231 } 232 result.resources.reserve = result.resources.length; 233 } 234 235 auto prefixes = response.querySelectorAll("listbucketresult commonprefixes prefix"); 236 result.commonPrefixes.reserve = 1000; 237 foreach(node; prefixes) 238 result.commonPrefixes.assumeSafeAppend ~= node.innerText; 239 result.commonPrefixes.reserve = result.commonPrefixes.length; 240 241 return result; 242 } 243 244 void upload(InputStream)( 245 string bucket, 246 string resource, 247 InputStream input, 248 string contentType = "application/octet-stream", 249 StorageClass storageClass = StorageClass.STANDARD, 250 size_t chunkSize = 512*1024, 251 ) 252 { 253 import std.conv : to; 254 string[string] headers; 255 headers["content-type"] = contentType; 256 headers["x-amz-storage-class"] = storageClass.to!string; 257 string[] signedHeaders = ["x-amz-storage-class"]; 258 auto resp = doUpload("PUT", bucket~"/"~resource, null, headers, signedHeaders, input, chunkSize); 259 checkForError(resp); 260 } 261 262 auto download(string bucket, string resource, 263 string[string] queryParameters = null, string[string] headers = null) 264 { 265 auto resp = doRequest("GET", bucket~"/"~resource, queryParameters, headers); 266 checkForError(resp); 267 return resp; 268 } 269 270 auto info(string bucket, string resource, string[string] queryParameters = null, string[string] headers = null) 271 { 272 auto resp = doRequest("HEAD", bucket~"/"~resource, queryParameters, headers); 273 checkForError(resp); 274 return resp.responseHeaders; 275 } 276 /+ 277 /++ 278 On_failure: aborts multipart upload. 279 +/ 280 void multipartUpload( 281 string resource, 282 scope InputStream input, 283 InetHeaderMap headers = InetHeaderMap.init, 284 string contentType = "application/octet-stream", 285 StorageClass storageClass = StorageClass.STANDARD, 286 SysTime expires = SysTime.init, 287 size_t chunkSize = 512*1024, 288 size_t partSize = 5*1024*1024, 289 ) 290 { 291 import vibe.d : logDebug, logWarn, createMemoryStream; 292 import std.array: appender, uninitializedArray; 293 import std.algorithm.comparison: min; 294 import std.exception : enforce; 295 logDebug("multipartUpload for %s ...", resource); 296 enforce(partSize >= 5 * 1024 * 1024, "multipartUpload: minimal allowed part size is 5 MB."); 297 auto id = startMultipartUpload(resource, headers, contentType, storageClass, expires); 298 scope(failure) 299 { 300 logWarn("aborting multipart upload for resource=%s, uploadId=%s", resource, id); 301 try 302 { 303 abortMultipartUpload(resource, id); 304 } 305 catch(Exception e) 306 { 307 logWarn(e.msg); 308 } 309 } 310 311 auto buf = uninitializedArray!(ubyte[])(partSize); 312 auto etags = appender!(Tuple!(string, size_t)[]); 313 314 size_t least = input.leastSize; 315 for(size_t part = 1;;part++) 316 { 317 size_t length; 318 do 319 { 320 auto newLength = least + length; 321 if(newLength > buf.length) 322 newLength = buf.length; 323 input.read(buf[length .. newLength]); 324 length = newLength; 325 least = input.leastSize; 326 } 327 while(least && length < buf.length); 328 logDebug("buf.length = %s", buf.length); 329 logDebug("least = %s", least); 330 logDebug("multipartUpload: sending %s bytes for part %s ...", length, part); 331 auto etag = uploadPart(resource, id, part, createMemoryStream(buf[0 .. length], false), contentType, chunkSize); 332 etags.put(tuple(etag, part)); 333 if(least == 0) 334 break; 335 } 336 enforce(etags.data, "At least one part should be uploaded."); 337 completeMultipartUpload(resource, id, etags.data); 338 } 339 340 string uploadPart( 341 string resource, 342 string id, 343 size_t part, 344 RandomAccessStream input, 345 string contentType = "application/octet-stream", 346 size_t chunkSize = 512*1024, 347 ) 348 { 349 import vibe.d : HTTPMethod, logDebug; 350 import std.conv : to; 351 string[string] queryParameters = [ 352 "partNumber": part.to!string, 353 "uploadId": id, 354 ]; 355 InetHeaderMap headers; 356 headers["Content-Type"] = contentType; 357 logDebug("uploadPart: doUpload ..."); 358 auto httpResp = doUpload(HTTPMethod.PUT, resource, queryParameters, headers, null, input, chunkSize); 359 logDebug("uploadPart: doUpload finished."); 360 httpResp.dropBody(); 361 auto etag = httpResp.headers["ETag"]; 362 httpResp.destroy(); 363 logDebug("uploadPart: finished."); 364 return etag; 365 } 366 367 string startMultipartUpload( 368 string resource, 369 InetHeaderMap headers = InetHeaderMap.init, 370 string contentType = "application/octet-stream", 371 StorageClass storageClass = StorageClass.STANDARD, 372 SysTime expires = SysTime.init, 373 ) 374 { 375 import vibe.d : HTTPMethod; 376 import std.conv : to; 377 headers["Content-Type"] = contentType; 378 headers["x-amz-storage-class"] = storageClass.to!string; 379 string[] signedHeaders = ["x-amz-storage-class"]; 380 if(expires != SysTime.init) 381 { 382 expires.fracSecs = expires.fracSecs.init; 383 headers["Expires"] = expires.toISOString; // HTTP format is different. So, we need to check if it is works. 384 } 385 auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploads":null], headers); 386 scope(exit) 387 { 388 httpResp.dropBody(); 389 httpResp.destroy(); 390 } 391 auto document = readXML(httpResp); 392 auto id = document.querySelector("InitiateMultipartUploadResult UploadId").safeInnerText; 393 return id; 394 } 395 396 void completeMultipartUpload( 397 string resource, 398 string id, 399 in Tuple!(string, size_t)[] parts, 400 InetHeaderMap headers = InetHeaderMap.init, 401 ) 402 { 403 import vibe.d : HTTPMethod; 404 import std.format; 405 import std.array: appender; 406 auto app = appender!(char[]); 407 app.put(`<CompleteMultipartUpload>`); 408 FormatSpec!char fmt; 409 foreach(ref part; parts) 410 { 411 app.put(`<Part><PartNumber>`); 412 app.formatValue(part[1], fmt); 413 app.put(`</PartNumber><ETag>`); 414 app.put(part[0]); 415 app.put(`</ETag></Part>`); 416 } 417 app.put(`</CompleteMultipartUpload>`); 418 auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploadId":id], headers, cast(ubyte[])app.data); 419 httpResp.dropBody(); 420 httpResp.destroy(); 421 } 422 423 void abortMultipartUpload(string resource, string id) 424 { 425 import vibe.d : HTTPMethod; 426 auto httpResp = doRequest(HTTPMethod.DELETE, resource, ["uploadId":id], InetHeaderMap.init); 427 httpResp.dropBody(); 428 httpResp.destroy(); 429 } 430 431 +/ 432 }