| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::fmt::Debug; |
| |
| use async_trait::async_trait; |
| use bytes::Bytes; |
| use http::header; |
| use http::Request; |
| use http::Response; |
| use http::StatusCode; |
| |
| use super::error::parse_error; |
| use super::graph_model::CreateDirPayload; |
| use super::graph_model::ItemType; |
| use super::graph_model::OneDriveUploadSessionCreationRequestBody; |
| use super::graph_model::OnedriveGetItemBody; |
| use super::pager::OnedrivePager; |
| use super::writer::OneDriveWriter; |
| use crate::raw::*; |
| use crate::*; |
| |
| #[derive(Clone)] |
| pub struct OnedriveBackend { |
| root: String, |
| access_token: String, |
| client: HttpClient, |
| } |
| |
| impl OnedriveBackend { |
| pub(crate) fn new(root: String, access_token: String, http_client: HttpClient) -> Self { |
| Self { |
| root, |
| access_token, |
| client: http_client, |
| } |
| } |
| } |
| |
| impl Debug for OnedriveBackend { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| let mut de = f.debug_struct("OneDriveBackend"); |
| de.field("root", &self.root); |
| de.field("access_token", &self.access_token); |
| de.finish() |
| } |
| } |
| |
| #[async_trait] |
| impl Accessor for OnedriveBackend { |
| type Reader = IncomingAsyncBody; |
| type BlockingReader = (); |
| type Writer = OneDriveWriter; |
| type BlockingWriter = (); |
| type Appender = (); |
| type Pager = OnedrivePager; |
| type BlockingPager = (); |
| |
| fn info(&self) -> AccessorInfo { |
| let mut ma = AccessorInfo::default(); |
| ma.set_scheme(crate::Scheme::Onedrive) |
| .set_root(&self.root) |
| .set_capability(Capability { |
| read: true, |
| write: true, |
| stat: true, |
| delete: true, |
| create_dir: true, |
| list: true, |
| list_with_delimiter_slash: true, |
| ..Default::default() |
| }); |
| |
| ma |
| } |
| |
| async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { |
| let resp = self.onedrive_get_content(path).await?; |
| |
| let status = resp.status(); |
| |
| match status { |
| StatusCode::OK | StatusCode::PARTIAL_CONTENT => { |
| let meta = parse_into_metadata(path, resp.headers())?; |
| Ok((RpRead::with_metadata(meta), resp.into_body())) |
| } |
| |
| _ => Err(parse_error(resp).await?), |
| } |
| } |
| |
| async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { |
| if args.content_length().is_none() { |
| return Err(Error::new( |
| ErrorKind::Unsupported, |
| "write without content length is not supported", |
| )); |
| } |
| |
| let path = build_rooted_abs_path(&self.root, path); |
| |
| Ok(( |
| RpWrite::default(), |
| OneDriveWriter::new(self.clone(), args, path), |
| )) |
| } |
| |
| async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { |
| // Stat root always returns a DIR. |
| if path == "/" { |
| return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); |
| } |
| |
| let resp = self.onedrive_get_stat(path).await?; |
| let status = resp.status(); |
| |
| if status.is_success() { |
| let bytes = resp.into_body().bytes().await?; |
| let decoded_response = serde_json::from_slice::<OnedriveGetItemBody>(&bytes) |
| .map_err(new_json_deserialize_error)?; |
| |
| let entry_mode: EntryMode = match decoded_response.item_type { |
| ItemType::Folder { .. } => EntryMode::DIR, |
| ItemType::File { .. } => EntryMode::FILE, |
| }; |
| |
| let mut meta = Metadata::new(entry_mode); |
| meta.set_etag(&decoded_response.e_tag); |
| |
| let last_modified = decoded_response.last_modified_date_time; |
| let date_utc_last_modified = parse_datetime_from_rfc3339(&last_modified)?; |
| meta.set_last_modified(date_utc_last_modified); |
| |
| meta.set_content_length(decoded_response.size); |
| |
| Ok(RpStat::new(meta)) |
| } else { |
| match status { |
| StatusCode::NOT_FOUND if path.ends_with('/') => { |
| Ok(RpStat::new(Metadata::new(EntryMode::DIR))) |
| } |
| _ => Err(parse_error(resp).await?), |
| } |
| } |
| } |
| |
| /// Delete operation |
| /// Documentation: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_delete?view=odsp-graph-online |
| async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> { |
| let resp = self.onedrive_delete(path).await?; |
| |
| let status = resp.status(); |
| |
| match status { |
| StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(RpDelete::default()), |
| _ => Err(parse_error(resp).await?), |
| } |
| } |
| |
| async fn list(&self, path: &str, _op_list: OpList) -> Result<(RpList, Self::Pager)> { |
| let pager: OnedrivePager = OnedrivePager::new(self.root.clone(), path.into(), self.clone()); |
| |
| Ok((RpList::default(), pager)) |
| } |
| |
| async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { |
| let path = build_rooted_abs_path(&self.root, path); |
| let path_before_last_slash = get_parent(&path); |
| let encoded_path = percent_encode_path(path_before_last_slash); |
| |
| let uri = format!( |
| "https://graph.microsoft.com/v1.0/me/drive/root:{}:/children", |
| encoded_path |
| ); |
| |
| let folder_name = get_basename(&path); |
| let folder_name = folder_name.strip_suffix('/').unwrap_or(folder_name); |
| |
| let body = CreateDirPayload::new(folder_name.to_string()); |
| |
| let response = self.onedrive_create_dir(&uri, body).await?; |
| |
| let status = response.status(); |
| match status { |
| StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()), |
| _ => Err(parse_error(response).await?), |
| } |
| } |
| } |
| |
| impl OnedriveBackend { |
| pub(crate) const BASE_URL: &'static str = "https://graph.microsoft.com/v1.0/me"; |
| |
| async fn onedrive_get_stat(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { |
| let path = build_rooted_abs_path(&self.root, path); |
| let url: String = format!( |
| "https://graph.microsoft.com/v1.0/me/drive/root:{}{}", |
| percent_encode_path(&path), |
| "" |
| ); |
| |
| let mut req = Request::get(&url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| let req = req |
| .body(AsyncBody::Empty) |
| .map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| pub(crate) async fn onedrive_get_next_list_page( |
| &self, |
| url: &str, |
| ) -> Result<Response<IncomingAsyncBody>> { |
| let mut req = Request::get(url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| let req = req |
| .body(AsyncBody::Empty) |
| .map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| async fn onedrive_get_content(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { |
| let path = build_rooted_abs_path(&self.root, path); |
| let url: String = format!( |
| "https://graph.microsoft.com/v1.0/me/drive/root:{}{}", |
| percent_encode_path(&path), |
| ":/content" |
| ); |
| |
| let mut req = Request::get(&url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| let req = req |
| .body(AsyncBody::Empty) |
| .map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| pub async fn onedrive_upload_simple( |
| &self, |
| path: &str, |
| size: Option<usize>, |
| content_type: Option<&str>, |
| body: AsyncBody, |
| ) -> Result<Response<IncomingAsyncBody>> { |
| let url = format!( |
| "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content", |
| percent_encode_path(path) |
| ); |
| |
| let mut req = Request::put(&url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| if let Some(size) = size { |
| req = req.header(header::CONTENT_LENGTH, size) |
| } |
| |
| if let Some(mime) = content_type { |
| req = req.header(header::CONTENT_TYPE, mime) |
| } |
| |
| let req = req.body(body).map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| pub(crate) async fn onedrive_chunked_upload( |
| &self, |
| url: &str, |
| content_type: Option<&str>, |
| offset: usize, |
| chunk_end: usize, |
| total_len: usize, |
| body: AsyncBody, |
| ) -> Result<Response<IncomingAsyncBody>> { |
| let mut req = Request::put(url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len); |
| req = req.header("Content-Range".to_string(), range); |
| |
| let size = chunk_end - offset + 1; |
| req = req.header(header::CONTENT_LENGTH, size.to_string()); |
| |
| if let Some(mime) = content_type { |
| req = req.header(header::CONTENT_TYPE, mime) |
| } |
| |
| let req = req.body(body).map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| pub(crate) async fn onedrive_create_upload_session( |
| &self, |
| url: &str, |
| body: OneDriveUploadSessionCreationRequestBody, |
| ) -> Result<Response<IncomingAsyncBody>> { |
| let mut req = Request::post(url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| req = req.header(header::CONTENT_TYPE, "application/json"); |
| |
| let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; |
| let asyn_body = AsyncBody::Bytes(Bytes::from(body_bytes)); |
| let req = req.body(asyn_body).map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| async fn onedrive_create_dir( |
| &self, |
| url: &str, |
| body: CreateDirPayload, |
| ) -> Result<Response<IncomingAsyncBody>> { |
| let mut req = Request::post(url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| req = req.header(header::CONTENT_TYPE, "application/json"); |
| |
| let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; |
| let async_body = AsyncBody::Bytes(bytes::Bytes::from(body_bytes)); |
| let req = req.body(async_body).map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| |
| pub(crate) async fn onedrive_delete(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { |
| let path = build_abs_path(&self.root, path); |
| let url = format!( |
| "https://graph.microsoft.com/v1.0/me/drive/root:/{}", |
| percent_encode_path(&path) |
| ); |
| |
| let mut req = Request::delete(&url); |
| |
| let auth_header_content = format!("Bearer {}", self.access_token); |
| req = req.header(header::AUTHORIZATION, auth_header_content); |
| |
| let req = req |
| .body(AsyncBody::Empty) |
| .map_err(new_request_build_error)?; |
| |
| self.client.send(req).await |
| } |
| } |