blob: 29e2d783b037dbea5d8e3f0f19bf3e45bd693989 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use std::fmt::Formatter;
use std::time::Duration;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use http::header::{CACHE_CONTROL, CONTENT_DISPOSITION};
use http::Request;
use http::Response;
use reqsign::HuaweicloudObsCredential;
use reqsign::HuaweicloudObsCredentialLoader;
use reqsign::HuaweicloudObsSigner;
use crate::raw::*;
use crate::*;
pub struct ObsCore {
pub bucket: String,
pub root: String,
pub endpoint: String,
pub signer: HuaweicloudObsSigner,
pub loader: HuaweicloudObsCredentialLoader,
pub client: HttpClient,
}
impl Debug for ObsCore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backend")
.field("root", &self.root)
.field("bucket", &self.bucket)
.field("endpoint", &self.endpoint)
.finish_non_exhaustive()
}
}
impl ObsCore {
async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
let cred = self
.loader
.load()
.await
.map_err(new_request_credential_error)?;
if let Some(cred) = cred {
Ok(Some(cred))
} else {
Ok(None)
}
}
pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
let cred = if let Some(cred) = self.load_credential().await? {
cred
} else {
return Ok(());
};
self.signer.sign(req, &cred).map_err(new_request_sign_error)
}
pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
let cred = if let Some(cred) = self.load_credential().await? {
cred
} else {
return Ok(());
};
self.signer
.sign_query(req, duration, &cred)
.map_err(new_request_sign_error)
}
#[inline]
pub async fn send(&self, req: Request<AsyncBody>) -> Result<Response<IncomingAsyncBody>> {
self.client.send(req).await
}
}
impl ObsCore {
pub async fn obs_get_object(
&self,
path: &str,
range: BytesRange,
if_match: Option<&str>,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.obs_get_object_request(path, range, if_match, if_none_match)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub fn obs_get_object_request(
&self,
path: &str,
range: BytesRange,
if_match: Option<&str>,
if_none_match: Option<&str>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut req = Request::get(&url);
if let Some(if_match) = if_match {
req = req.header(IF_MATCH, if_match);
}
if !range.is_full() {
req = req.header(http::header::RANGE, range.to_header())
}
if let Some(if_none_match) = if_none_match {
req = req.header(IF_NONE_MATCH, if_none_match);
}
let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
Ok(req)
}
pub fn obs_put_object_request(
&self,
path: &str,
size: Option<usize>,
content_type: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut req = Request::put(&url);
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
}
if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control)
}
if let Some(mime) = content_type {
req = req.header(CONTENT_TYPE, mime)
}
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}
pub async fn obs_head_object(
&self,
path: &str,
if_match: Option<&str>,
if_none_match: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.obs_head_object_request(path, if_match, if_none_match)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub fn obs_head_object_request(
&self,
path: &str,
if_match: Option<&str>,
if_none_match: Option<&str>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
// The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud.
// https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html
let mut req = Request::head(&url);
if let Some(if_match) = if_match {
req = req.header(IF_MATCH, if_match);
}
if let Some(if_none_match) = if_none_match {
req = req.header(IF_NONE_MATCH, if_none_match);
}
let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
Ok(req)
}
pub async fn obs_delete_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let req = Request::delete(&url);
let mut req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub fn obs_append_object_request(
&self,
path: &str,
position: u64,
size: usize,
args: &OpAppend,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let url = format!(
"{}/{}?append&position={}",
self.endpoint,
percent_encode_path(&p),
position
);
let mut req = Request::post(&url);
req = req.header(CONTENT_LENGTH, size);
if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}
if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}
if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}
pub async fn obs_copy_object(
&self,
from: &str,
to: &str,
) -> Result<Response<IncomingAsyncBody>> {
let source = build_abs_path(&self.root, from);
let target = build_abs_path(&self.root, to);
let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));
let mut req = Request::put(&url)
.header("x-obs-copy-source", percent_encode_path(&source))
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
pub async fn obs_list_objects(
&self,
path: &str,
next_marker: &str,
delimiter: &str,
limit: Option<usize>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let mut queries = vec![];
if !path.is_empty() {
queries.push(format!("prefix={}", percent_encode_path(&p)));
}
if !delimiter.is_empty() {
queries.push(format!("delimiter={delimiter}"));
}
if let Some(limit) = limit {
queries.push(format!("max-keys={limit}"));
}
if !next_marker.is_empty() {
queries.push(format!("marker={next_marker}"));
}
let url = if queries.is_empty() {
self.endpoint.to_string()
} else {
format!("{}?{}", self.endpoint, queries.join("&"))
};
let mut req = Request::get(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
}