blob: dd56ae24611cf58568fe39d2c0fc7544d5452433 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
use super::core::GcsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
pub struct GcsWriter {
core: Arc<GcsCore>,
path: String,
op: OpWrite,
location: Option<String>,
written: u64,
buffer: oio::VectorCursor,
write_fixed_size: usize,
}
impl GcsWriter {
pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
let write_fixed_size = core.write_fixed_size;
GcsWriter {
core,
path: path.to_string(),
op,
location: None,
written: 0,
buffer: oio::VectorCursor::new(),
write_fixed_size,
}
}
async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
let mut req = self.core.gcs_insert_object_request(
&percent_encode_path(&self.path),
Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)?;
self.core.sign(&mut req).await?;
let resp = self.core.send(req).await?;
let status = resp.status();
match status {
StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
async fn initiate_upload(&self) -> Result<String> {
let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?;
let status = resp.status();
match status {
StatusCode::OK => {
let bs = parse_location(resp.headers())?;
if let Some(location) = bs {
Ok(location.to_string())
} else {
Err(Error::new(
ErrorKind::Unexpected,
"location is not in the response header",
))
}
}
_ => Err(parse_error(resp).await?),
}
}
async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> {
let mut req = self.core.gcs_upload_in_resumable_upload(
location,
bs.len() as u64,
self.written,
false,
AsyncBody::Bytes(bs),
)?;
self.core.sign(&mut req).await?;
let resp = self.core.send(req).await?;
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()),
_ => Err(parse_error(resp).await?),
}
}
}
#[async_trait]
impl oio::Write for GcsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let location = match &self.location {
Some(location) => location,
None => {
if self.op.content_length().unwrap_or_default() == bs.len() as u64
&& self.written == 0
{
return self.write_oneshot(bs).await;
} else {
let location = self.initiate_upload().await?;
self.location = Some(location);
self.location.as_deref().unwrap()
}
}
};
// Ignore empty bytes
if bs.is_empty() {
return Ok(());
}
self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.write_fixed_size {
return Ok(());
}
let bs = self.buffer.peak_exact(self.write_fixed_size);
match self.write_part(location, bs).await {
Ok(_) => {
self.buffer.take(self.write_fixed_size);
self.written += self.write_fixed_size as u64;
Ok(())
}
Err(e) => {
// If the upload fails, we should pop the given bs to make sure
// write is re-enter safe.
self.buffer.pop();
Err(e)
}
}
}
async fn abort(&mut self) -> Result<()> {
let location = if let Some(location) = &self.location {
location
} else {
return Ok(());
};
let resp = self.core.gcs_abort_resumable_upload(location).await?;
match resp.status().as_u16() {
// gcs returns 499 if the upload aborted successfully
// reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json
499 => {
resp.into_body().consume().await?;
self.location = None;
self.buffer.clear();
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
async fn close(&mut self) -> Result<()> {
let location = if let Some(location) = &self.location {
location
} else {
return Ok(());
};
let bs = self.buffer.peak_exact(self.buffer.len());
let resp = self
.core
.gcs_complete_resumable_upload(location, self.written, bs)
.await?;
let status = resp.status();
match status {
StatusCode::OK => {
resp.into_body().consume().await?;
self.location = None;
self.buffer.clear();
Ok(())
}
_ => Err(parse_error(resp).await?),
}
}
}