Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start work on an Object API for representing a remote cloud object #20

Merged
merged 5 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions src/CloudStore.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import CloudBase: AWS, Azure, CloudTest
# for specific clouds
module API

export Object, ResponseBodyType, RequestBodyType
export Object, IOObject, ResponseBodyType, RequestBodyType

using HTTP, CodecZlib, Mmap
import WorkerUtilities: OrderedSynchronizer
Expand All @@ -27,23 +27,16 @@ const RequestBodyType = Union{AbstractVector{UInt8}, String, IO}
asArray(x::Array) = x
asArray(x) = [x]

struct Object
store::AbstractStore
key::String
lastModified::String
eTag::String
size::Int
storageClass::String
end

etag(x) = strip(x, '"')
makeURL(x::AbstractStore, key) = joinpath(x.baseurl, lstrip(key, '/'))

include("object.jl")

function cloudName end
function maxListKeys end
function listMaxKeysQuery end
function continuationToken end
function listObjects end
makeURL(x::AbstractStore, key) = joinpath(x.baseurl, lstrip(key, '/'))
function getObject end
function headObject end
include("get.jl")
Expand Down
16 changes: 8 additions & 8 deletions src/blobs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,36 @@ const Credentials = Azure.Credentials

API.cloudName(::Container) = "Blob Storage"

object(b, x) = Object(b, x["Name"], x["Properties"]["Last-Modified"], API.etag(x["Properties"]["Etag"]), parse(Int, x["Properties"]["Content-Length"]), "")
object(b, creds, x) = Object(b, creds, x["Name"], parse(Int, x["Properties"]["Content-Length"]), API.etag(x["Properties"]["Etag"]))

API.maxListKeys(::Container) = 5000
API.listMaxKeysQuery(::Container) = "maxresults"
API.continuationToken(::Container) = "marker"

function API.listObjects(x::Container, query, result=nothing; kw...)
function API.listObjects(x::Container, query, result=nothing; credentials=nothing, kw...)
query["restype"] = "container"
query["comp"] = "list"
result = xml_dict(String(Azure.get(x.baseurl; query, kw...).body))["EnumerationResults"]
result = xml_dict(String(Azure.get(x.baseurl; query, credentials, kw...).body))["EnumerationResults"]
if isempty(result["Blobs"])
return (Object[], "")
end
contents = map(y -> object(x, y), API.asArray(result["Blobs"]["Blob"]))
contents = map(y -> object(x, credentials, y), API.asArray(result["Blobs"]["Blob"]))
return (contents, result["NextMarker"])
end

list(x::Container; kw...) = API.listObjectsImpl(x; kw...)

API.getObject(x::Container, url, headers; kw...) = Azure.get(url, headers; kw...)

get(x::Object, args...; kw...) = get(x.store, x.key, args...; kw...)
get(x::Object, args...; kw...) = get(x.store, x.key, args...; credentials=x.credentials, kw...)
get(args...; kw...) = API.getObjectImpl(args...; kw...)

API.headObject(x::Container, url, headers; kw...) = Azure.head(url; headers, kw...)
head(x::Object; kw...) = head(x.store, x.key; kw...)
head(x::Object; kw...) = head(x.store, x.key; credentials=x.credentials, kw...)
head(x::Container, key::String; kw...) = API.headObjectImpl(x, key; kw...)

put(args...; kw...) = API.putObjectImpl(args...; kw...)
put(x::Object; kw...) = put(x.store, x.key; kw...)
put(x::Object; kw...) = put(x.store, x.key; credentials=x.credentials, kw...)

API.putObject(x::Container, key, body; kw...) = Azure.put(API.makeURL(x, key), ["x-ms-blob-type" => "BlockBlob"], body; kw...)

Expand All @@ -57,7 +57,7 @@ function API.completeMultipartUpload(x::Container, url, eTags, uploadId; kw...)
end

delete(x::Container, key::String; kw...) = Azure.delete(API.makeURL(x, key); kw...)
delete(x::Object; kw...) = delete(x.store, x.key; kw...)
delete(x::Object; kw...) = delete(x.store, x.key; credentials=x.credentials, kw...)

for func in (:list, :get, :head, :put, :delete)
@eval function $func(url::AbstractString, args...; kw...)
Expand Down
66 changes: 66 additions & 0 deletions src/object.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import CloudBase: AbstractStore, CloudCredentials, AWS, Azure

struct Object{T <: AbstractStore}
store::T
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm guessing it might be worth store being concretely typed, since we're going to be dispatching on that often? I wonder if we should add conveniences like const S3Object = Object{AWS.Bucket}?

credentials::Union{Nothing, AWS.Credentials, Azure.Credentials}
key::String
size::Int
eTag::String
end

Object(
store::AbstractStore,
creds::Union{Nothing, AWS.Credentials, Azure.Credentials},
key::AbstractString,
size::Integer,
eTag::AbstractString) = Object(store, creds, String(key), Int(size), String(eTag))

function Object(store::AbstractStore, key::String; credentials::Union{CloudCredentials, Nothing}=nothing)
url = makeURL(store, key)
resp = API.headObject(store, url, HTTP.Headers(); credentials=credentials)
size = parse(Int, HTTP.header(resp, "Content-Length", "0"))
#TODO: get eTag
et = etag(HTTP.header(resp, "ETag", ""))
return Object(store, credentials, key, size, String(et))
end

Base.length(x::Object) = x.size

function Base.copyto!(dest::AbstractVector{UInt8}, doff::Integer, src::Object, soff::Integer, n::Integer)
# validate arguments
0 < doff <= length(dest) || throw(BoundsError(dest, doff))
0 < soff <= length(src) || throw(BoundsError(src, soff))
(soff + n) - 1 <= length(src) || throw(ArgumentError("requested number of bytes (`$n`) would exceed source length"))
(doff + n) - 1 <= length(dest) || throw(ArgumentError("requested number of bytes (`$n`) would exceed destination length"))
return unsafe_copyto!(dest, doff, src, soff, n)
end

function Base.unsafe_copyto!(dest::AbstractVector{UInt8}, doff::Integer, src::Object, soff::Integer, n::Integer)
headers = HTTP.Headers()
HTTP.setheader(headers, contentRange((soff - 1):(soff + n - 2)))
url = makeURL(src.store, src.key)
# avoid extra copy here by passing dest to be written to directly
resp = getObject(src.store, url, headers; credentials=src.credentials)
copyto!(dest, doff, resp.body)
return n
end

mutable struct IOObject{T <: Object} <: IO
object::T
pos::Int
end

IOObject(x::Object) = IOObject(x, 1)
IOObject(store::AbstractStore, key::String; credentials::Union{CloudCredentials, Nothing}=nothing) =
IOObject(Object(store, key; credentials))

Base.eof(x::IOObject) = x.pos > length(x.object)

function Base.readbytes!(x::IOObject, dest::AbstractVector{UInt8}, n::Integer=length(dest))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To have the same semantics as Base.readbytes!, we should grow the dest array as necessary to store the incomming bytes. Not that we rely on this in ChunkedCSV.

n = min(n, length(dest))
n = min(n, length(x.object) - x.pos + 1)
n == 0 && return dest
Base.unsafe_copyto!(dest, 1, x.object, x.pos, n)
x.pos += n
return dest
end
14 changes: 7 additions & 7 deletions src/put.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
partSize::Int=MULTIPART_SIZE,
batchSize::Int=defaultBatchSize(),
allowMultipart::Bool=true,
compress::Bool=false, kw...)
compress::Bool=false, credentials=nothing, kw...)

N = nbytes(in)
if N <= multipartThreshold || !allowMultipart
body = prepBody(in, compress)
resp = putObject(x, key, body; kw...)
return Object(x, key, "", etag(HTTP.header(resp, "ETag")), N, "")
resp = putObject(x, key, body; credentials, kw...)
return Object(x, credentials, key, N, etag(HTTP.header(resp, "ETag")))
end
# multipart upload
uploadState = startMultipartUpload(x, key; kw...)
uploadState = startMultipartUpload(x, key; credentials, kw...)
url = makeURL(x, key)
eTags = String[]
sync = OrderedSynchronizer(1)
Expand All @@ -74,7 +74,7 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
part = _read(body, partSize)
let n=n, part=part
Threads.@spawn begin
eTag = uploadPart(x, url, part, n, uploadState; kw...)
eTag = uploadPart(x, url, part, n, uploadState; credentials, kw...)
let eTag=eTag
# we synchronize the eTags here because the order matters
# for the final call to completeMultipartUpload
Expand All @@ -92,6 +92,6 @@ function putObjectImpl(x::AbstractStore, key::String, in::RequestBodyType;
if in isa String
close(body)
end
eTag = completeMultipartUpload(x, url, eTags, uploadState; kw...)
return Object(x, key, "", eTag, N, "")
eTag = completeMultipartUpload(x, url, eTags, uploadState; credentials, kw...)
return Object(x, credentials, key, N, eTag)
end
16 changes: 8 additions & 8 deletions src/s3.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,35 @@ const Credentials = AWS.Credentials

API.cloudName(::Bucket) = "S3"

object(b::Bucket, x) = Object(b, x["Key"], x["LastModified"], API.etag(x["ETag"]), parse(Int, x["Size"]), x["StorageClass"])
object(b::Bucket, creds, x) = Object(b, creds, x["Key"], parse(Int, x["Size"]), API.etag(x["ETag"]))

API.maxListKeys(::Bucket) = 1000
API.listMaxKeysQuery(::Bucket) = "max-keys"
API.continuationToken(::Bucket) = "continuation-token"

function API.listObjects(x::Bucket, query, result=nothing; kw...)
function API.listObjects(x::Bucket, query, result=nothing; credentials=nothing, kw...)
query["list-type"] = "2"
result = xml_dict(String(AWS.get(x.baseurl; query, service="s3", kw...).body))["ListBucketResult"]
result = xml_dict(String(AWS.get(x.baseurl; credentials, query, service="s3", kw...).body))["ListBucketResult"]
if parse(Int, result["KeyCount"]) == 0
return (Object[], "")
end
contents = map(y -> object(x, y), API.asArray(result["Contents"]))
contents = map(y -> object(x, credentials, y), API.asArray(result["Contents"]))
return (contents, result["IsTruncated"] == "true" ? result["NextContinuationToken"] : "")
end

list(x::Bucket; kw...) = API.listObjectsImpl(x; kw...)

API.getObject(x::Bucket, url, headers; kw...) = AWS.get(url, headers; service="s3", kw...)

get(x::Object, out::ResponseBodyType=nothing; kw...) = get(x.store, x.key, out; kw...)
get(x::Object, out::ResponseBodyType=nothing; kw...) = get(x.store, x.key, out; credentials=x.credentials, kw...)
get(args...; kw...) = API.getObjectImpl(args...; kw...)

API.headObject(x::Bucket, url, headers; kw...) = AWS.head(url; headers, service="s3", kw...)
head(x::Object; kw...) = head(x.store, x.key; kw...)
head(x::Object; kw...) = head(x.store, x.key; credentials=x.credentials, kw...)
head(x::Bucket, key::String; kw...) = API.headObjectImpl(x, key; kw...)

put(args...; kw...) = API.putObjectImpl(args...; kw...)
put(x::Object; kw...) = put(x.store, x.key; kw...)
put(x::Object; kw...) = put(x.store, x.key; credentials=x.credentials, kw...)

API.putObject(x::Bucket, key, body; kw...) = AWS.put(API.makeURL(x, key), [], body; service="s3", kw...)

Expand All @@ -58,7 +58,7 @@ function API.completeMultipartUpload(x::Bucket, url, eTags, uploadId; kw...)
end

delete(x::Bucket, key; kw...) = AWS.delete(API.makeURL(x, key); service="s3", kw...)
delete(x::Object; kw...) = delete(x.store, x.key; kw...)
delete(x::Object; kw...) = delete(x.store, x.key; credentials=x.credentials, kw...)

for func in (:list, :get, :head, :put, :delete)
@eval function $func(url::AbstractString, args...; region=nothing, nowarn::Bool=false, kw...)
Expand Down
25 changes: 22 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ end
]
for (url, parts) in azure
ok, host, account, container, blob = CloudStore.parseAzureAccountContainerBlob(url; parseLocal=true)
@show url, ok, host, account, container, blob
@test ok
@test host == parts[2]
@test account == parts[3]
Expand Down Expand Up @@ -255,12 +254,32 @@ end
]
for (url, parts) in s3
ok, accelerate, host, bucket, reg, key = CloudStore.parseAWSBucketRegionKey(url; parseLocal=true)
@show url, ok, accelerate, host, bucket, reg, key
@test ok
@test accelerate == parts[2]
@test host == parts[3]
@test bucket == parts[4]
@test reg == parts[5]
@test key == parts[6]
end
end
end

@testset "CloudStore.Object API" begin
Minio.with(; debug=true) do conf
credentials, bucket = conf
multicsv = "1,2,3,4,5,6,7,8,9,1\n"^1000000; # 20MB
S3.put(bucket, "test.csv", codeunits(multicsv); credentials)
obj = CloudStore.Object(bucket, "test.csv"; credentials)
@test length(obj) == sizeof(multicsv)
buf = Vector{UInt8}(undef, 1000)
copyto!(buf, 1, obj, 1, 1000)
@test buf == view(codeunits(multicsv), 1:1000)

ioobj = CloudStore.IOObject(obj)
i = 1
while !eof(ioobj)
readbytes!(ioobj, buf, 1000)
@test buf == view(codeunits(multicsv), i:min(i+999, length(multicsv)))
i += 1000
end
end
end