Skip to content

Commit

Permalink
Start work on an Object API for representing a remote cloud object (#20)
Browse files Browse the repository at this point in the history
* Start work on an Object API for representing a remote cloud object

* Cleanup Object and add IOObject

* oops

* More cleanup for Object API

* cleanup
  • Loading branch information
quinnj authored Nov 2, 2022
1 parent bfe14f9 commit 1cd9f39
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 37 deletions.
15 changes: 4 additions & 11 deletions src/CloudStore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import CloudBase: AWS, Azure, CloudTest
# for specific clouds
module API

export Object, ResponseBodyType, RequestBodyType
export Object, IOObject, ResponseBodyType, RequestBodyType

using HTTP, CodecZlib, Mmap
import WorkerUtilities: OrderedSynchronizer
Expand All @@ -27,23 +27,16 @@ const RequestBodyType = Union{AbstractVector{UInt8}, String, IO}
asArray(x::Array) = x
asArray(x) = [x]

struct Object
store::AbstractStore
key::String
lastModified::String
eTag::String
size::Int
storageClass::String
end

etag(x) = strip(x, '"')
makeURL(x::AbstractStore, key) = joinpath(x.baseurl, lstrip(key, '/'))

include("object.jl")

function cloudName end
function maxListKeys end
function listMaxKeysQuery end
function continuationToken end
function listObjects end
makeURL(x::AbstractStore, key) = joinpath(x.baseurl, lstrip(key, '/'))
function getObject end
function headObject end
include("get.jl")
Expand Down
16 changes: 8 additions & 8 deletions src/blobs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,36 @@ const Credentials = Azure.Credentials

API.cloudName(::Container) = "Blob Storage"

object(b, x) = Object(b, x["Name"], x["Properties"]["Last-Modified"], API.etag(x["Properties"]["Etag"]), parse(Int, x["Properties"]["Content-Length"]), "")
object(b, creds, x) = Object(b, creds, x["Name"], parse(Int, x["Properties"]["Content-Length"]), API.etag(x["Properties"]["Etag"]))

API.maxListKeys(::Container) = 5000
API.listMaxKeysQuery(::Container) = "maxresults"
API.continuationToken(::Container) = "marker"

function API.listObjects(x::Container, query, result=nothing; kw...)
function API.listObjects(x::Container, query, result=nothing; credentials=nothing, kw...)
query["restype"] = "container"
query["comp"] = "list"
result = xml_dict(String(Azure.get(x.baseurl; query, kw...).body))["EnumerationResults"]
result = xml_dict(String(Azure.get(x.baseurl; query, credentials, kw...).body))["EnumerationResults"]
if isempty(result["Blobs"])
return (Object[], "")
end
contents = map(y -> object(x, y), API.asArray(result["Blobs"]["Blob"]))
contents = map(y -> object(x, credentials, y), API.asArray(result["Blobs"]["Blob"]))
return (contents, result["NextMarker"])
end

list(x::Container; kw...) = API.listObjectsImpl(x; kw...)

API.getObject(x::Container, url, headers; kw...) = Azure.get(url, headers; kw...)

get(x::Object, args...; kw...) = get(x.store, x.key, args...; kw...)
get(x::Object, args...; kw...) = get(x.store, x.key, args...; credentials=x.credentials, kw...)
get(args...; kw...) = API.getObjectImpl(args...; kw...)

API.headObject(x::Container, url, headers; kw...) = Azure.head(url; headers, kw...)
head(x::Object; kw...) = head(x.store, x.key; kw...)
head(x::Object; kw...) = head(x.store, x.key; credentials=x.credentials, kw...)
head(x::Container, key::String; kw...) = API.headObjectImpl(x, key; kw...)

put(args...; kw...) = API.putObjectImpl(args...; kw...)
put(x::Object; kw...) = put(x.store, x.key; kw...)
put(x::Object; kw...) = put(x.store, x.key; credentials=x.credentials, kw...)

API.putObject(x::Container, key, body; kw...) = Azure.put(API.makeURL(x, key), ["x-ms-blob-type" => "BlockBlob"], body; kw...)

Expand All @@ -57,7 +57,7 @@ function API.completeMultipartUpload(x::Container, url, eTags, uploadId; kw...)
end

delete(x::Container, key::String; kw...) = Azure.delete(API.makeURL(x, key); kw...)
delete(x::Object; kw...) = delete(x.store, x.key; kw...)
delete(x::Object; kw...) = delete(x.store, x.key; credentials=x.credentials, kw...)

for func in (:list, :get, :head, :put, :delete)
@eval function $func(url::AbstractString, args...; kw...)
Expand Down
66 changes: 66 additions & 0 deletions src/object.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import CloudBase: AbstractStore, CloudCredentials, AWS, Azure

struct Object{T <: AbstractStore}
store::T
credentials::Union{Nothing, AWS.Credentials, Azure.Credentials}
key::String
size::Int
eTag::String
end

Object(
store::AbstractStore,
creds::Union{Nothing, AWS.Credentials, Azure.Credentials},
key::AbstractString,
size::Integer,
eTag::AbstractString) = Object(store, creds, String(key), Int(size), String(eTag))

function Object(store::AbstractStore, key::String; credentials::Union{CloudCredentials, Nothing}=nothing)
url = makeURL(store, key)
resp = API.headObject(store, url, HTTP.Headers(); credentials=credentials)
size = parse(Int, HTTP.header(resp, "Content-Length", "0"))
#TODO: get eTag
et = etag(HTTP.header(resp, "ETag", ""))
return Object(store, credentials, key, size, String(et))
end

Base.length(x::Object) = x.size

function Base.copyto!(dest::AbstractVector{UInt8}, doff::Integer, src::Object, soff::Integer, n::Integer)
# validate arguments
0 < doff <= length(dest) || throw(BoundsError(dest, doff))
0 < soff <= length(src) || throw(BoundsError(src, soff))
(soff + n) - 1 <= length(src) || throw(ArgumentError("requested number of bytes (`$n`) would exceed source length"))
(doff + n) - 1 <= length(dest) || throw(ArgumentError("requested number of bytes (`$n`) would exceed destination length"))
return unsafe_copyto!(dest, doff, src, soff, n)
end

function Base.unsafe_copyto!(dest::AbstractVector{UInt8}, doff::Integer, src::Object, soff::Integer, n::Integer)
headers = HTTP.Headers()
HTTP.setheader(headers, contentRange((soff - 1):(soff + n - 2)))
url = makeURL(src.store, src.key)
# avoid extra copy here by passing dest to be written to directly
resp = getObject(src.store, url, headers; credentials=src.credentials)
copyto!(dest, doff, resp.body)
return n
end

mutable struct IOObject{T <: Object} <: IO
object::T
pos::Int
end

IOObject(x::Object) = IOObject(x, 1)
IOObject(store::AbstractStore, key::String; credentials::Union{CloudCredentials, Nothing}=nothing) =
IOObject(Object(store, key; credentials))

Base.eof(x::IOObject) = x.pos > length(x.object)

function Base.readbytes!(x::IOObject, dest::AbstractVector{UInt8}, n::Integer=length(dest))
n = min(n, length(dest))
n = min(n, length(x.object) - x.pos + 1)
n == 0 && return dest
Base.unsafe_copyto!(dest, 1, x.object, x.pos, n)
x.pos += n
return dest
end
14 changes: 7 additions & 7 deletions src/put.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
partSize::Int=MULTIPART_SIZE,
batchSize::Int=defaultBatchSize(),
allowMultipart::Bool=true,
compress::Bool=false, kw...)
compress::Bool=false, credentials=nothing, kw...)

N = nbytes(in)
if N <= multipartThreshold || !allowMultipart
body = prepBody(in, compress)
resp = putObject(x, key, body; kw...)
return Object(x, key, "", etag(HTTP.header(resp, "ETag")), N, "")
resp = putObject(x, key, body; credentials, kw...)
return Object(x, credentials, key, N, etag(HTTP.header(resp, "ETag")))
end
# multipart upload
uploadState = startMultipartUpload(x, key; kw...)
uploadState = startMultipartUpload(x, key; credentials, kw...)
url = makeURL(x, key)
eTags = String[]
sync = OrderedSynchronizer(1)
Expand All @@ -74,7 +74,7 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
part = _read(body, partSize)
let n=n, part=part
Threads.@spawn begin
eTag = uploadPart(x, url, part, n, uploadState; kw...)
eTag = uploadPart(x, url, part, n, uploadState; credentials, kw...)
let eTag=eTag
# we synchronize the eTags here because the order matters
# for the final call to completeMultipartUpload
Expand All @@ -92,6 +92,6 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
if in isa String
close(body)
end
eTag = completeMultipartUpload(x, url, eTags, uploadState; kw...)
return Object(x, key, "", eTag, N, "")
eTag = completeMultipartUpload(x, url, eTags, uploadState; credentials, kw...)
return Object(x, credentials, key, N, eTag)
end
16 changes: 8 additions & 8 deletions src/s3.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,35 @@ const Credentials = AWS.Credentials

API.cloudName(::Bucket) = "S3"

object(b::Bucket, x) = Object(b, x["Key"], x["LastModified"], API.etag(x["ETag"]), parse(Int, x["Size"]), x["StorageClass"])
object(b::Bucket, creds, x) = Object(b, creds, x["Key"], parse(Int, x["Size"]), API.etag(x["ETag"]))

API.maxListKeys(::Bucket) = 1000
API.listMaxKeysQuery(::Bucket) = "max-keys"
API.continuationToken(::Bucket) = "continuation-token"

function API.listObjects(x::Bucket, query, result=nothing; kw...)
function API.listObjects(x::Bucket, query, result=nothing; credentials=nothing, kw...)
query["list-type"] = "2"
result = xml_dict(String(AWS.get(x.baseurl; query, service="s3", kw...).body))["ListBucketResult"]
result = xml_dict(String(AWS.get(x.baseurl; credentials, query, service="s3", kw...).body))["ListBucketResult"]
if parse(Int, result["KeyCount"]) == 0
return (Object[], "")
end
contents = map(y -> object(x, y), API.asArray(result["Contents"]))
contents = map(y -> object(x, credentials, y), API.asArray(result["Contents"]))
return (contents, result["IsTruncated"] == "true" ? result["NextContinuationToken"] : "")
end

list(x::Bucket; kw...) = API.listObjectsImpl(x; kw...)

API.getObject(x::Bucket, url, headers; kw...) = AWS.get(url, headers; service="s3", kw...)

get(x::Object, out::ResponseBodyType=nothing; kw...) = get(x.store, x.key, out; kw...)
get(x::Object, out::ResponseBodyType=nothing; kw...) = get(x.store, x.key, out; credentials=x.credentials, kw...)
get(args...; kw...) = API.getObjectImpl(args...; kw...)

API.headObject(x::Bucket, url, headers; kw...) = AWS.head(url; headers, service="s3", kw...)
head(x::Object; kw...) = head(x.store, x.key; kw...)
head(x::Object; kw...) = head(x.store, x.key; credentials=x.credentials, kw...)
head(x::Bucket, key::String; kw...) = API.headObjectImpl(x, key; kw...)

put(args...; kw...) = API.putObjectImpl(args...; kw...)
put(x::Object; kw...) = put(x.store, x.key; kw...)
put(x::Object; kw...) = put(x.store, x.key; credentials=x.credentials, kw...)

API.putObject(x::Bucket, key, body; kw...) = AWS.put(API.makeURL(x, key), [], body; service="s3", kw...)

Expand All @@ -58,7 +58,7 @@ function API.completeMultipartUpload(x::Bucket, url, eTags, uploadId; kw...)
end

delete(x::Bucket, key; kw...) = AWS.delete(API.makeURL(x, key); service="s3", kw...)
delete(x::Object; kw...) = delete(x.store, x.key; kw...)
delete(x::Object; kw...) = delete(x.store, x.key; credentials=x.credentials, kw...)

for func in (:list, :get, :head, :put, :delete)
@eval function $func(url::AbstractString, args...; region=nothing, nowarn::Bool=false, kw...)
Expand Down
25 changes: 22 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ end
]
for (url, parts) in azure
ok, host, account, container, blob = CloudStore.parseAzureAccountContainerBlob(url; parseLocal=true)
@show url, ok, host, account, container, blob
@test ok
@test host == parts[2]
@test account == parts[3]
Expand Down Expand Up @@ -255,12 +254,32 @@ end
]
for (url, parts) in s3
ok, accelerate, host, bucket, reg, key = CloudStore.parseAWSBucketRegionKey(url; parseLocal=true)
@show url, ok, accelerate, host, bucket, reg, key
@test ok
@test accelerate == parts[2]
@test host == parts[3]
@test bucket == parts[4]
@test reg == parts[5]
@test key == parts[6]
end
end
end

@testset "CloudStore.Object API" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB
S3.put(bucket, "test.csv", codeunits(multicsv); credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
buf = Vector{UInt8}(undef, 1000)
copyto!(buf, 1, obj, 1, 1000)
@test buf == view(codeunits(multicsv), 1:1000)

ioobj = CloudStore.IOObject(obj)
i = 1
while !eof(ioobj)
readbytes!(ioobj, buf, 1000)
@test buf == view(codeunits(multicsv), i:min(i+999, length(multicsv)))
i += 1000
end
end
end

0 comments on commit 1cd9f39

Please sign in to comment.