1 module aws.s3;
2 
3 import aws.aws;
4 import aws.credentials;
5 import aws.sigv4;
6 
7 import std.typecons: Tuple, tuple;
8 import std.string : toLower;
9 
10 enum StorageClass: string
11 {
12     STANDARD = "STANDARD",
13     REDUCED_REDUNDANCY = "REDUCED_REDUNDANCY",
14     GLACIER = "GLACIER"
15 }
16 
17 struct BucketListResult
18 {
19     static struct S3Resource
20     {
21         static struct Owner
22         {
23             string id;
24             string displayName;
25         }
26 
27         string key;
28         string lastModfied;
29         string etag;
30         ulong size;
31         Owner owner;
32         StorageClass storageClass;
33     }
34 
35     string name;
36     string prefix;
37     string marker;
38     string nextMarker;
39     S3Resource[] resources;
40     string[] commonPrefixes;
41     uint maxKeys;
42     bool isTruncated;
43 }
44 
45 // auto listFilesRecursive(S3 client, string path = null)
46 // {
47 //     import std.algorithm : endsWith;
48 //     import std.range : empty;
49 //     if(!path.empty && !path.endsWith("/"))
50 //         path ~= "/";
51 //     return S3Resources(client, null, path);
52 // }
53 
54 // auto listFiles(S3 client, string path = null)
55 // {
56 //     import std.algorithm : endsWith;
57 //     import std.range : empty;
58 //     if(!path.empty && !path.endsWith("/"))
59 //         path ~= "/";
60 //     return S3Resources(client, "/", path);
61 // }
62 
63 // auto listFolders(S3 client, string path = null)
64 // {
65 //     import std.range : empty;
66 //     import std.algorithm : endsWith;
67 //     if(!path.empty && !path.endsWith("/"))
68 //         path ~= "/";
69 //     return S3Prefixes(client, "/", path);
70 // }
71 
72 // struct S3Resources
73 // {
74 //     import std.range.primitives;
75 //     mixin _S3Common;
76 
77 //     auto front() @property
78 //     {
79 //         assert(!empty);
80 //         return res.resources.front;
81 //     }
82 
83 //     auto empty() const @property
84 //     {
85 //         return res.resources.empty;
86 //     }
87 
88 //     auto popFront()
89 //     {
90 //         assert(!empty);
91 //         res.resources.popFront;
92 //         if(empty && res.isTruncated)
93 //         {
94 //             next;
95 //         }
96 //     }
97 
98 // }
99 
100 // struct S3Prefixes
101 // {
102 //     import std.range.primitives;
103 //     mixin _S3Common;
104 
105 //     auto front() @property
106 //     {
107 //         assert(!empty);
108 //         return res.commonPrefixes.front;
109 //     }
110 
111 //     auto empty() const @property
112 //     {
113 //         return res.commonPrefixes.empty;
114 //     }
115 
116 //     auto popFront()
117 //     {
118 //         assert(!empty);
119 //         res.commonPrefixes.popFront;
120 //         if(empty && res.isTruncated)
121 //         {
122 //             next;
123 //         }
124 //     }
125 // }
126 
127 // private mixin template _S3Common()
128 // {
129 //     private S3 client;
130 //     private BucketListResult res;
131 //     private string delimiter;
132 //     private string prefix;
133 //     private uint maxKeys;
134 
135 //     private void next()
136 //     {
137 //         res = client.list(delimiter, prefix, res.nextMarker, maxKeys);
138 //     }
139 
140 //     @disable this();
141 
142 //     this(S3 client, string delimiter, string prefix, uint maxKeys = 0)
143 //     {
144 //         this.client = client;
145 //         this.delimiter = delimiter;
146 //         this.prefix = prefix;
147 //         this.maxKeys = maxKeys;
148 //         next();
149 //     }
150 
151 //     auto save() @property
152 //     {
153 //         return this;
154 //     }
155 // }
156 
157 import arsd.dom;
158 
159 class S3 : RESTClient
160 {
161     this(string endpoint, string region, AWSCredentialSource credsSource) nothrow @safe {
162         super(endpoint, region, "s3", credsSource);
163     }
164 
165     auto createBucket(string bucket) {
166         string[string] headers;
167         string[] signedHeaders = null;
168         ubyte[] input = null;
169         auto resp = doUpload("PUT", bucket, null, headers, signedHeaders, input, 1024);
170         checkForError(resp);
171     }
172 
173     auto list(string bucket, string delimiter = null, string prefix = null, string marker = null, uint maxKeys = 0)
174     {
175         assert(maxKeys <= 1000);
176 
177         import std.stdio;
178         import std.conv;
179 
180         string[string] headers;
181         string[string] queryParameters;
182         queryParameters["list-type"] = "2";
183         if (bucket !is null && bucket != "")
184             queryParameters["encoding-type"] = "url";
185 
186         if (delimiter !is null)
187             queryParameters["delimiter"] = delimiter;
188 
189         if (prefix !is null)
190             queryParameters["prefix"] = prefix;
191         else
192             queryParameters["prefix"] = "";
193 
194         if (marker !is null)
195             queryParameters["continuation-token"] = marker;
196 
197         if (maxKeys)
198             queryParameters["max-keys"] = maxKeys.to!string;
199 
200         auto resp = doRequest("GET", bucket~"/", queryParameters, headers);
201         checkForError(resp);
202         auto response = readXML(resp);
203 
204         BucketListResult result;
205         result.name = response.querySelector("listbucketresult name").safeInnerText;
206         result.prefix = response.querySelector("listbucketresult prefix").safeInnerText;
207         result.marker = response.querySelector("listbucketresult marker").safeInnerText;
208         result.maxKeys = response.querySelector("listbucketresult maxkeys").safeInnerText.to!uint;
209         result.isTruncated = response.querySelector("listbucketresult istruncated").safeInnerText.toLower.to!bool;
210 
211         if (result.isTruncated)
212             result.nextMarker = response.querySelector("listbucketresult nextcontinuationtoken").safeInnerText;
213 
214         auto entries = response.querySelectorAll("listbucketresult contents");
215 
216         if (entries) {
217           result.resources.reserve = 1000;
218           foreach(node; entries)
219             {
220               BucketListResult.S3Resource entry;
221               BucketListResult.S3Resource.Owner owner;
222 
223               entry.key = node.querySelector("key").safeInnerText;
224               entry.lastModfied = node.querySelector("lastModified").safeInnerText;
225               entry.etag = node.querySelector("etag").safeInnerText;
226               entry.size = node.querySelector("size").safeInnerText.to!ulong;
227               import std.conv;
228               entry.storageClass = node.querySelector("storageclass").safeInnerText.to!StorageClass;
229 
230               result.resources.assumeSafeAppend ~= entry;
231             }
232           result.resources.reserve = result.resources.length;
233         }
234 
235         auto prefixes = response.querySelectorAll("listbucketresult commonprefixes prefix");
236         result.commonPrefixes.reserve = 1000;
237         foreach(node; prefixes)
238             result.commonPrefixes.assumeSafeAppend ~= node.innerText;
239         result.commonPrefixes.reserve = result.commonPrefixes.length;
240 
241         return result;
242     }
243 
244     void upload(InputStream)(
245                               string bucket,
246                 string resource,
247                 InputStream input,
248                 string contentType = "application/octet-stream",
249                 StorageClass storageClass = StorageClass.STANDARD,
250                 size_t chunkSize = 512*1024,
251                 )
252     {
253         import std.conv : to;
254         string[string] headers;
255         headers["content-type"] = contentType;
256         headers["x-amz-storage-class"] = storageClass.to!string;
257         string[] signedHeaders = ["x-amz-storage-class"];
258         auto resp = doUpload("PUT", bucket~"/"~resource, null, headers, signedHeaders, input, chunkSize);
259         checkForError(resp);
260     }
261 
262     auto download(string bucket, string resource,
263                   string[string] queryParameters = null, string[string] headers = null)
264     {
265         auto resp = doRequest("GET", bucket~"/"~resource, queryParameters, headers);
266         checkForError(resp);
267         return resp;
268     }
269 
270     auto info(string bucket, string resource, string[string] queryParameters = null, string[string] headers = null)
271     {
272         auto resp = doRequest("HEAD", bucket~"/"~resource, queryParameters, headers);
273         checkForError(resp);
274         return resp.responseHeaders;
275     }
276 /+
277     /++
278     On_failure: aborts multipart upload.
279     +/
280     void multipartUpload(
281         string resource,
282         scope InputStream input,
283         InetHeaderMap headers = InetHeaderMap.init,
284         string contentType = "application/octet-stream",
285         StorageClass storageClass = StorageClass.STANDARD, 
286         SysTime expires = SysTime.init,
287         size_t chunkSize = 512*1024,
288         size_t partSize = 5*1024*1024,
289         )
290     {
291         import vibe.d : logDebug, logWarn, createMemoryStream;
292         import std.array: appender, uninitializedArray;
293         import std.algorithm.comparison: min;
294         import std.exception : enforce;
295         logDebug("multipartUpload for %s ...", resource);
296         enforce(partSize >= 5 * 1024 * 1024, "multipartUpload: minimal allowed part size is 5 MB.");
297         auto id = startMultipartUpload(resource, headers, contentType, storageClass, expires);
298         scope(failure)
299         {
300             logWarn("aborting multipart upload for resource=%s, uploadId=%s", resource, id);
301             try
302             {
303                 abortMultipartUpload(resource, id);
304             }
305             catch(Exception e)
306             {
307                 logWarn(e.msg);
308             }
309         }
310 
311         auto buf = uninitializedArray!(ubyte[])(partSize);
312         auto etags = appender!(Tuple!(string, size_t)[]);
313 
314         size_t least = input.leastSize;
315         for(size_t part = 1;;part++)
316         {
317             size_t length;
318             do
319             {
320                 auto newLength = least + length;
321                 if(newLength > buf.length)
322                     newLength = buf.length;
323                 input.read(buf[length .. newLength]);
324                 length = newLength;
325                 least = input.leastSize;
326             }
327             while(least && length < buf.length);
328             logDebug("buf.length = %s", buf.length);
329             logDebug("least = %s", least);
330             logDebug("multipartUpload: sending %s bytes for part %s ...", length, part);
331             auto etag = uploadPart(resource, id, part, createMemoryStream(buf[0 .. length], false), contentType, chunkSize);
332             etags.put(tuple(etag, part));
333             if(least == 0)
334                 break;
335         }
336         enforce(etags.data, "At least one part should be uploaded.");
337         completeMultipartUpload(resource, id, etags.data);
338     }
339 
340     string uploadPart(
341         string resource,
342         string id,
343         size_t part,
344         RandomAccessStream input,
345         string contentType = "application/octet-stream",
346         size_t chunkSize = 512*1024,
347         )
348     {
349         import vibe.d : HTTPMethod, logDebug;
350         import std.conv : to;
351         string[string] queryParameters = [
352             "partNumber": part.to!string,
353             "uploadId": id,
354         ];
355         InetHeaderMap headers;
356         headers["Content-Type"] = contentType;
357         logDebug("uploadPart: doUpload ...");
358         auto httpResp = doUpload(HTTPMethod.PUT, resource, queryParameters, headers, null, input, chunkSize);
359         logDebug("uploadPart: doUpload finished.");
360         httpResp.dropBody();
361         auto etag = httpResp.headers["ETag"];
362         httpResp.destroy();
363         logDebug("uploadPart: finished.");
364         return etag;
365     }
366 
367     string startMultipartUpload(
368         string resource,
369         InetHeaderMap headers = InetHeaderMap.init,
370         string contentType = "application/octet-stream", 
371         StorageClass storageClass = StorageClass.STANDARD,
372         SysTime expires = SysTime.init,
373         )
374     {
375         import vibe.d : HTTPMethod;
376         import std.conv : to;
377         headers["Content-Type"] = contentType;
378         headers["x-amz-storage-class"] = storageClass.to!string;
379         string[] signedHeaders = ["x-amz-storage-class"];
380         if(expires != SysTime.init)
381         {
382             expires.fracSecs = expires.fracSecs.init;
383             headers["Expires"] = expires.toISOString; // HTTP format is different. So, we need to check if it is works.
384         }
385         auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploads":null], headers);
386         scope(exit)
387         {
388             httpResp.dropBody();
389             httpResp.destroy();
390         }
391         auto document = readXML(httpResp);
392         auto id = document.querySelector("InitiateMultipartUploadResult UploadId").safeInnerText;
393         return id;
394     }
395 
396     void completeMultipartUpload(
397         string resource,
398         string id,
399         in Tuple!(string, size_t)[] parts,
400         InetHeaderMap headers = InetHeaderMap.init,
401         )
402     {
403         import vibe.d : HTTPMethod;
404         import std.format;
405         import std.array: appender;
406         auto app = appender!(char[]);
407         app.put(`<CompleteMultipartUpload>`);
408         FormatSpec!char fmt;
409         foreach(ref part; parts)
410         {
411             app.put(`<Part><PartNumber>`);
412             app.formatValue(part[1], fmt);
413             app.put(`</PartNumber><ETag>`);
414             app.put(part[0]);
415             app.put(`</ETag></Part>`);
416         }
417         app.put(`</CompleteMultipartUpload>`);
418         auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploadId":id], headers, cast(ubyte[])app.data);
419         httpResp.dropBody();
420         httpResp.destroy();
421     }
422 
423     void abortMultipartUpload(string resource, string id)
424     {
425         import vibe.d : HTTPMethod;
426         auto httpResp = doRequest(HTTPMethod.DELETE, resource, ["uploadId":id], InetHeaderMap.init);
427         httpResp.dropBody();
428         httpResp.destroy();
429     }
430 
431     +/
432 }