blob: 01c585e654453d46f18e55a94697e3214c976f32 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::VecDeque;
use std::mem;
use async_trait::async_trait;
use crate::raw::*;
use crate::*;
/// to_flat_pager is used to make a hierarchy pager flat.
pub fn to_flat_pager<A: Accessor, P>(acc: A, path: &str, size: usize) -> ToFlatPager<A, P> {
#[cfg(debug_assertions)]
{
let meta = acc.info();
debug_assert!(
meta.capability().list_with_delimiter_slash,
"service doesn't support list hierarchy, it must be a bug"
);
}
ToFlatPager {
acc,
size,
root: path.to_string(),
dirs: VecDeque::from([oio::Entry::new(path, Metadata::new(EntryMode::DIR))]),
pagers: vec![],
res: Vec::with_capacity(size),
}
}
/// ToFlatPager will walk dir in bottom up way:
///
/// - List nested dir first
/// - Go back into parent dirs one by one
///
/// Given the following file tree:
///
/// ```txt
/// .
/// ├── dir_x/
/// │ ├── dir_y/
/// │ │ ├── dir_z/
/// │ │ └── file_c
/// │ └── file_b
/// └── file_a
/// ```
///
/// ToFlatPager will output entries like:
///
/// ```txt
/// dir_x/dir_y/dir_z/file_c
/// dir_x/dir_y/dir_z/
/// dir_x/dir_y/file_b
/// dir_x/dir_y/
/// dir_x/file_a
/// dir_x/
/// ```
///
/// # Note
///
/// There is no guarantee about the order between files and dirs at the same level.
/// We only make sure the nested dirs will show up before parent dirs.
///
/// Especially, for storage services that can't return dirs first, ToFlatPager
/// may output parent dirs' files before nested dirs, this is expected because files
/// always output directly while listing.
pub struct ToFlatPager<A: Accessor, P> {
acc: A,
size: usize,
root: String,
dirs: VecDeque<oio::Entry>,
pagers: Vec<(P, oio::Entry, Vec<oio::Entry>)>,
res: Vec<oio::Entry>,
}
#[async_trait]
impl<A, P> oio::Page for ToFlatPager<A, P>
where
A: Accessor<Pager = P>,
P: oio::Page,
{
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
loop {
if let Some(de) = self.dirs.pop_back() {
let (_, op) = self.acc.list(de.path(), OpList::new()).await?;
self.pagers.push((op, de, vec![]))
}
let (mut pager, de, mut buf) = match self.pagers.pop() {
Some((pager, de, buf)) => (pager, de, buf),
None => {
if !self.res.is_empty() {
return Ok(Some(mem::take(&mut self.res)));
}
return Ok(None);
}
};
if buf.is_empty() {
match pager.next().await? {
Some(v) => {
buf = v;
}
None => {
// Only push entry if it's not root dir
if de.path() != self.root {
self.res.push(de);
}
continue;
}
}
}
let mut buf = VecDeque::from(buf);
loop {
if let Some(oe) = buf.pop_front() {
if oe.mode().is_dir() {
self.dirs.push_back(oe);
self.pagers.push((pager, de, buf.into()));
break;
} else {
self.res.push(oe)
}
} else {
self.pagers.push((pager, de, vec![]));
break;
}
}
if self.res.len() >= self.size {
return Ok(Some(mem::take(&mut self.res)));
}
}
}
}
impl<A, P> oio::BlockingPage for ToFlatPager<A, P>
where
A: Accessor<BlockingPager = P>,
P: oio::BlockingPage,
{
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
loop {
if let Some(de) = self.dirs.pop_back() {
let (_, op) = self.acc.blocking_list(de.path(), OpList::new())?;
self.pagers.push((op, de, vec![]))
}
let (mut pager, de, mut buf) = match self.pagers.pop() {
Some((pager, de, buf)) => (pager, de, buf),
None => {
if !self.res.is_empty() {
return Ok(Some(mem::take(&mut self.res)));
}
return Ok(None);
}
};
if buf.is_empty() {
match pager.next()? {
Some(v) => {
buf = v;
}
None => {
// Only push entry if it's not root dir
if de.path() != self.root {
self.res.push(de);
}
continue;
}
}
}
let mut buf = VecDeque::from(buf);
loop {
if let Some(oe) = buf.pop_front() {
if oe.mode().is_dir() {
self.dirs.push_back(oe);
self.pagers.push((pager, de, buf.into()));
break;
} else {
self.res.push(oe)
}
} else {
self.pagers.push((pager, de, vec![]));
break;
}
}
if self.res.len() >= self.size {
return Ok(Some(mem::take(&mut self.res)));
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::vec;
use log::debug;
use oio::BlockingPage;
use super::*;
#[derive(Debug)]
struct MockService {
map: HashMap<&'static str, Vec<&'static str>>,
}
impl MockService {
fn new() -> Self {
let mut map = HashMap::default();
map.insert("x/", vec!["x/x/"]);
map.insert("x/x/", vec!["x/x/x/"]);
map.insert("x/x/x/", vec!["x/x/x/x"]);
Self { map }
}
fn get(&self, path: &str) -> MockPager {
let inner = self.map.get(path).expect("must have value").to_vec();
MockPager { inner, done: false }
}
}
#[async_trait]
impl Accessor for MockService {
type Reader = ();
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
type Appender = ();
type Pager = ();
type BlockingPager = MockPager;
fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.capability_mut().list = true;
am.capability_mut().list_with_delimiter_slash = true;
am
}
fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingPager)> {
debug!("visit path: {path}");
Ok((RpList::default(), self.get(path)))
}
}
struct MockPager {
inner: Vec<&'static str>,
done: bool,
}
impl BlockingPage for MockPager {
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
if self.done {
return Ok(None);
}
self.done = true;
let entries = self
.inner
.iter()
.map(|path| {
if path.ends_with('/') {
oio::Entry::new(path, Metadata::new(EntryMode::DIR))
} else {
oio::Entry::new(path, Metadata::new(EntryMode::FILE))
}
})
.collect();
Ok(Some(entries))
}
}
#[test]
fn test_blocking_list() -> Result<()> {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let acc = MockService::new();
let mut pager = to_flat_pager(acc, "x/", 10);
let mut entries = Vec::default();
while let Some(e) = pager.next()? {
entries.extend_from_slice(&e)
}
assert_eq!(
entries[0],
oio::Entry::new("x/x/x/x", Metadata::new(EntryMode::FILE))
);
Ok(())
}
}