blob: 540b016443eb1137fb1ddba54064029684075008 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Buf;
use quick_xml::de;
use serde::Deserialize;
use super::core::AzblobCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
pub struct AzblobPager {
core: Arc<AzblobCore>,
path: String,
delimiter: String,
limit: Option<usize>,
next_marker: String,
done: bool,
}
impl AzblobPager {
pub fn new(
core: Arc<AzblobCore>,
path: String,
delimiter: String,
limit: Option<usize>,
) -> Self {
Self {
core,
path,
delimiter,
limit,
next_marker: "".to_string(),
done: false,
}
}
}
#[async_trait]
impl oio::Page for AzblobPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
if self.done {
return Ok(None);
}
let resp = self
.core
.azblob_list_blobs(&self.path, &self.next_marker, &self.delimiter, self.limit)
.await?;
if resp.status() != http::StatusCode::OK {
return Err(parse_error(resp).await?);
}
let bs = resp.into_body().bytes().await?;
let output: Output = de::from_reader(bs.reader()).map_err(|e| {
Error::new(ErrorKind::Unexpected, "deserialize xml from response").set_source(e)
})?;
// Try our best to check whether this list is done.
//
// - Check `next_marker`
if let Some(next_marker) = output.next_marker.as_ref() {
self.done = next_marker.is_empty();
};
self.next_marker = output.next_marker.clone().unwrap_or_default();
let prefixes = output.blobs.blob_prefix;
let mut entries = Vec::with_capacity(prefixes.len() + output.blobs.blob.len());
for prefix in prefixes {
let de = oio::Entry::new(
&build_rel_path(&self.core.root, &prefix.name),
Metadata::new(EntryMode::DIR),
);
entries.push(de)
}
for object in output.blobs.blob {
// azblob could return the dir itself in contents
// which endswith `/`.
// We should ignore them.
if object.name.ends_with('/') {
continue;
}
let meta = Metadata::new(EntryMode::FILE)
// Keep fit with ETag header.
.with_etag(format!("\"{}\"", object.properties.etag.as_str()))
.with_content_length(object.properties.content_length)
.with_content_md5(object.properties.content_md5)
.with_content_type(object.properties.content_type)
.with_last_modified(parse_datetime_from_rfc2822(
object.properties.last_modified.as_str(),
)?);
let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.name), meta);
entries.push(de);
}
Ok(Some(entries))
}
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct Output {
blobs: Blobs,
#[serde(rename = "NextMarker")]
next_marker: Option<String>,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct Blobs {
blob: Vec<Blob>,
blob_prefix: Vec<BlobPrefix>,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct BlobPrefix {
name: String,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct Blob {
properties: Properties,
name: String,
}
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct Properties {
#[serde(rename = "Content-Length")]
content_length: u64,
#[serde(rename = "Last-Modified")]
last_modified: String,
#[serde(rename = "Content-MD5")]
content_md5: String,
#[serde(rename = "Content-Type")]
content_type: String,
etag: String,
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use super::*;
#[test]
fn test_parse_xml() {
let bs = bytes::Bytes::from(
r#"
<?xml version="1.0" encoding="utf-8"?>
<EnumerationResults ServiceEndpoint="https://test.blob.core.windows.net/" ContainerName="myazurebucket">
<Prefix>dir1/</Prefix>
<Delimiter>/</Delimiter>
<Blobs>
<Blob>
<Name>dir1/2f018bb5-466f-4af1-84fa-2b167374ee06</Name>
<Properties>
<Creation-Time>Sun, 20 Mar 2022 11:29:03 GMT</Creation-Time>
<Last-Modified>Sun, 20 Mar 2022 11:29:03 GMT</Last-Modified>
<Etag>0x8DA0A64D66790C3</Etag>
<Content-Length>3485277</Content-Length>
<Content-Type>application/octet-stream</Content-Type>
<Content-Encoding />
<Content-Language />
<Content-CRC64 />
<Content-MD5>llJ/+jOlx5GdA1sL7SdKuw==</Content-MD5>
<Cache-Control />
<Content-Disposition />
<BlobType>BlockBlob</BlobType>
<AccessTier>Hot</AccessTier>
<AccessTierInferred>true</AccessTierInferred>
<LeaseStatus>unlocked</LeaseStatus>
<LeaseState>available</LeaseState>
<ServerEncrypted>true</ServerEncrypted>
</Properties>
<OrMetadata />
</Blob>
<Blob>
<Name>dir1/5b9432b2-79c0-48d8-90c2-7d3e153826ed</Name>
<Properties>
<Creation-Time>Tue, 29 Mar 2022 01:54:07 GMT</Creation-Time>
<Last-Modified>Tue, 29 Mar 2022 01:54:07 GMT</Last-Modified>
<Etag>0x8DA112702D88FE4</Etag>
<Content-Length>2471869</Content-Length>
<Content-Type>application/octet-stream</Content-Type>
<Content-Encoding />
<Content-Language />
<Content-CRC64 />
<Content-MD5>xmgUltSnopLSJOukgCHFtg==</Content-MD5>
<Cache-Control />
<Content-Disposition />
<BlobType>BlockBlob</BlobType>
<AccessTier>Hot</AccessTier>
<AccessTierInferred>true</AccessTierInferred>
<LeaseStatus>unlocked</LeaseStatus>
<LeaseState>available</LeaseState>
<ServerEncrypted>true</ServerEncrypted>
</Properties>
<OrMetadata />
</Blob>
<Blob>
<Name>dir1/b2d96f8b-d467-40d1-bb11-4632dddbf5b5</Name>
<Properties>
<Creation-Time>Sun, 20 Mar 2022 11:31:57 GMT</Creation-Time>
<Last-Modified>Sun, 20 Mar 2022 11:31:57 GMT</Last-Modified>
<Etag>0x8DA0A653DC82981</Etag>
<Content-Length>1259677</Content-Length>
<Content-Type>application/octet-stream</Content-Type>
<Content-Encoding />
<Content-Language />
<Content-CRC64 />
<Content-MD5>AxTiFXHwrXKaZC5b7ZRybw==</Content-MD5>
<Cache-Control />
<Content-Disposition />
<BlobType>BlockBlob</BlobType>
<AccessTier>Hot</AccessTier>
<AccessTierInferred>true</AccessTierInferred>
<LeaseStatus>unlocked</LeaseStatus>
<LeaseState>available</LeaseState>
<ServerEncrypted>true</ServerEncrypted>
</Properties>
<OrMetadata />
</Blob>
<BlobPrefix>
<Name>dir1/dir2/</Name>
</BlobPrefix>
<BlobPrefix>
<Name>dir1/dir21/</Name>
</BlobPrefix>
</Blobs>
<NextMarker />
</EnumerationResults>"#,
);
let out: Output = de::from_reader(bs.reader()).expect("must success");
println!("{out:?}");
assert_eq!(
out.blobs
.blob
.iter()
.map(|v| v.name.clone())
.collect::<Vec<String>>(),
[
"dir1/2f018bb5-466f-4af1-84fa-2b167374ee06",
"dir1/5b9432b2-79c0-48d8-90c2-7d3e153826ed",
"dir1/b2d96f8b-d467-40d1-bb11-4632dddbf5b5"
]
);
assert_eq!(
out.blobs
.blob
.iter()
.map(|v| v.properties.content_length)
.collect::<Vec<u64>>(),
[3485277, 2471869, 1259677]
);
assert_eq!(
out.blobs
.blob
.iter()
.map(|v| v.properties.content_md5.clone())
.collect::<Vec<String>>(),
[
"llJ/+jOlx5GdA1sL7SdKuw==".to_string(),
"xmgUltSnopLSJOukgCHFtg==".to_string(),
"AxTiFXHwrXKaZC5b7ZRybw==".to_string()
]
);
assert_eq!(
out.blobs
.blob
.iter()
.map(|v| v.properties.last_modified.clone())
.collect::<Vec<String>>(),
[
"Sun, 20 Mar 2022 11:29:03 GMT".to_string(),
"Tue, 29 Mar 2022 01:54:07 GMT".to_string(),
"Sun, 20 Mar 2022 11:31:57 GMT".to_string()
]
);
assert_eq!(
out.blobs
.blob
.iter()
.map(|v| v.properties.etag.clone())
.collect::<Vec<String>>(),
[
"0x8DA0A64D66790C3".to_string(),
"0x8DA112702D88FE4".to_string(),
"0x8DA0A653DC82981".to_string()
]
);
assert_eq!(
out.blobs
.blob_prefix
.iter()
.map(|v| v.name.clone())
.collect::<Vec<String>>(),
["dir1/dir2/", "dir1/dir21/"]
);
}
/// This case is copied from real environment for testing
/// quick-xml overlapped-lists features. By default, quick-xml
/// can't deserialize content with overlapped-lists.
///
/// For example, this case list blobs in this way:
///
/// ```xml
/// <Blobs>
/// <Blob>xxx</Blob>
/// <BlobPrefix>yyy</BlobPrefix>
/// <Blob>zzz</Blob>
/// </Blobs>
/// ```
///
/// If `overlapped-lists` feature not enabled, we will get error `duplicate field Blob`.
#[test]
fn test_parse_overlapped_lists() {
let bs = "<?xml version=\"1.0\" encoding=\"utf-8\"?><EnumerationResults ServiceEndpoint=\"https://test.blob.core.windows.net/\" ContainerName=\"test\"><Prefix>9f7075e1-84d0-45ca-8196-ab9b71a8ef97/x/</Prefix><Delimiter>/</Delimiter><Blobs><Blob><Name>9f7075e1-84d0-45ca-8196-ab9b71a8ef97/x/</Name><Properties><Creation-Time>Thu, 01 Sep 2022 07:26:49 GMT</Creation-Time><Last-Modified>Thu, 01 Sep 2022 07:26:49 GMT</Last-Modified><Etag>0x8DA8BEB55D0EA35</Etag><Content-Length>0</Content-Length><Content-Type>application/octet-stream</Content-Type><Content-Encoding /><Content-Language /><Content-CRC64 /><Content-MD5>1B2M2Y8AsgTpgAmY7PhCfg==</Content-MD5><Cache-Control /><Content-Disposition /><BlobType>BlockBlob</BlobType><AccessTier>Hot</AccessTier><AccessTierInferred>true</AccessTierInferred><LeaseStatus>unlocked</LeaseStatus><LeaseState>available</LeaseState><ServerEncrypted>true</ServerEncrypted></Properties><OrMetadata /></Blob><BlobPrefix><Name>9f7075e1-84d0-45ca-8196-ab9b71a8ef97/x/x/</Name></BlobPrefix><Blob><Name>9f7075e1-84d0-45ca-8196-ab9b71a8ef97/x/y</Name><Properties><Creation-Time>Thu, 01 Sep 2022 07:26:50 GMT</Creation-Time><Last-Modified>Thu, 01 Sep 2022 07:26:50 GMT</Last-Modified><Etag>0x8DA8BEB55D99C08</Etag><Content-Length>0</Content-Length><Content-Type>application/octet-stream</Content-Type><Content-Encoding /><Content-Language /><Content-CRC64 /><Content-MD5>1B2M2Y8AsgTpgAmY7PhCfg==</Content-MD5><Cache-Control /><Content-Disposition /><BlobType>BlockBlob</BlobType><AccessTier>Hot</AccessTier><AccessTierInferred>true</AccessTierInferred><LeaseStatus>unlocked</LeaseStatus><LeaseState>available</LeaseState><ServerEncrypted>true</ServerEncrypted></Properties><OrMetadata /></Blob></Blobs><NextMarker /></EnumerationResults>";
de::from_reader(Bytes::from(bs).reader()).expect("must success")
}
}