blob: e3e29a027f8ed5319c8379197f3b497e69528b32 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use async_trait::async_trait;
use serde::Deserialize;
use serde_json::de;
use super::core::AzdfsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
pub struct AzdfsPager {
core: Arc<AzdfsCore>,
path: String,
limit: Option<usize>,
continuation: String,
done: bool,
}
impl AzdfsPager {
pub fn new(core: Arc<AzdfsCore>, path: String, limit: Option<usize>) -> Self {
Self {
core,
path,
limit,
continuation: "".to_string(),
done: false,
}
}
}
#[async_trait]
impl oio::Page for AzdfsPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
if self.done {
return Ok(None);
}
let resp = self
.core
.azdfs_list(&self.path, &self.continuation, self.limit)
.await?;
// Azdfs will return not found for not-exist path.
if resp.status() == http::StatusCode::NOT_FOUND {
resp.into_body().consume().await?;
return Ok(None);
}
if resp.status() != http::StatusCode::OK {
return Err(parse_error(resp).await?);
}
// Check whether this list is done.
if let Some(value) = resp.headers().get("x-ms-continuation") {
let value = value.to_str().map_err(|err| {
Error::new(ErrorKind::Unexpected, "header value is not valid string")
.set_source(err)
})?;
self.continuation = value.to_string();
} else {
self.continuation = "".to_string();
self.done = true;
}
let bs = resp.into_body().bytes().await?;
let output: Output = de::from_slice(&bs).map_err(|e| {
Error::new(ErrorKind::Unexpected, "deserialize json from response").set_source(e)
})?;
let mut entries = Vec::with_capacity(output.paths.len());
for object in output.paths {
// Azdfs will return `"true"` and `"false"` for is_directory.
let mode = if &object.is_directory == "true" {
EntryMode::DIR
} else {
EntryMode::FILE
};
let meta = Metadata::new(mode)
// Keep fit with ETag header.
.with_etag(format!("\"{}\"", &object.etag))
.with_content_length(object.content_length.parse().map_err(|err| {
Error::new(ErrorKind::Unexpected, "content length is not valid integer")
.set_source(err)
})?)
.with_last_modified(parse_datetime_from_rfc2822(&object.last_modified)?);
let mut path = build_rel_path(&self.core.root, &object.name);
if mode == EntryMode::DIR {
path += "/"
};
let de = oio::Entry::new(&path, meta);
entries.push(de);
}
Ok(Some(entries))
}
}
/// # Examples
///
/// ```json
/// {"paths":[{"contentLength":"1977097","etag":"0x8DACF9B0061305F","group":"$superuser","lastModified":"Sat, 26 Nov 2022 10:43:05 GMT","name":"c3b3ef48-7783-4946-81bc-dc07e1728878/d4ea21d7-a533-4011-8b1f-d0e566d63725","owner":"$superuser","permissions":"rw-r-----"}]}
/// ```
#[derive(Default, Debug, Deserialize)]
#[serde(default)]
struct Output {
paths: Vec<Path>,
}
#[derive(Default, Debug, Deserialize, PartialEq, Eq)]
#[serde(default)]
struct Path {
#[serde(rename = "contentLength")]
content_length: String,
#[serde(rename = "etag")]
etag: String,
/// Azdfs will return `"true"` and `"false"` for is_directory.
#[serde(rename = "isDirectory")]
is_directory: String,
#[serde(rename = "lastModified")]
last_modified: String,
#[serde(rename = "name")]
name: String,
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use super::*;
#[test]
fn test_parse_path() {
let bs = Bytes::from(
r#"{"paths":[{"contentLength":"1977097","etag":"0x8DACF9B0061305F","group":"$superuser","lastModified":"Sat, 26 Nov 2022 10:43:05 GMT","name":"c3b3ef48-7783-4946-81bc-dc07e1728878/d4ea21d7-a533-4011-8b1f-d0e566d63725","owner":"$superuser","permissions":"rw-r-----"}]}"#,
);
let out: Output = de::from_slice(&bs).expect("must success");
println!("{out:?}");
assert_eq!(
out.paths[0],
Path {
content_length: "1977097".to_string(),
etag: "0x8DACF9B0061305F".to_string(),
is_directory: "".to_string(),
last_modified: "Sat, 26 Nov 2022 10:43:05 GMT".to_string(),
name: "c3b3ef48-7783-4946-81bc-dc07e1728878/d4ea21d7-a533-4011-8b1f-d0e566d63725"
.to_string()
}
);
}
}