blob: 0aa376337a818e797328eff4fa47cd63b7536fb1 [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
#include <array>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "quiche/common/capsule.h"
#include "quiche/common/http/http_header_block.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_status_utils.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/web_transport/web_transport.h"
namespace webtransport {
namespace {
using ::quiche::Capsule;
using ::quiche::CapsuleType;
using ::quiche::CloseWebTransportSessionCapsule;
// This is arbitrary, since we don't have any real MTU restriction when running
// over TCP.
constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
} // namespace
EncapsulatedSession::EncapsulatedSession(
Perspective perspective, FatalErrorCallback fatal_error_callback)
: perspective_(perspective),
fatal_error_callback_(std::move(fatal_error_callback)),
capsule_parser_(this) {}
void EncapsulatedSession::InitializeClient(
std::unique_ptr<SessionVisitor> visitor,
quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
quiche::ReadStream* reader) {
if (state_ != kUninitialized) {
OnFatalError("Called InitializeClient() in an invalid state");
return;
}
if (perspective_ != Perspective::kClient) {
OnFatalError("Called InitializeClient() on a server session");
return;
}
visitor_ = std::move(visitor);
writer_ = writer;
reader_ = reader;
state_ = kWaitingForHeaders;
}
void EncapsulatedSession::InitializeServer(
std::unique_ptr<SessionVisitor> visitor,
const quiche::HttpHeaderBlock& /*incoming_headers*/,
quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
quiche::ReadStream* reader) {
if (state_ != kUninitialized) {
OnFatalError("Called InitializeServer() in an invalid state");
return;
}
if (perspective_ != Perspective::kServer) {
OnFatalError("Called InitializeServer() on a client session");
return;
}
visitor_ = std::move(visitor);
writer_ = writer;
reader_ = reader;
OpenSession();
}
void EncapsulatedSession::ProcessIncomingServerHeaders(
const quiche::HttpHeaderBlock& /*headers*/) {
if (state_ != kWaitingForHeaders) {
OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state");
return;
}
OpenSession();
}
void EncapsulatedSession::CloseSession(SessionErrorCode error_code,
absl::string_view error_message) {
switch (state_) {
case kUninitialized:
case kWaitingForHeaders:
OnFatalError(absl::StrCat(
"Attempted to close a session before it opened with error 0x",
absl::Hex(error_code), ": ", error_message));
return;
case kSessionClosing:
case kSessionClosed:
OnFatalError(absl::StrCat(
"Attempted to close a session that is already closed with error 0x",
absl::Hex(error_code), ": ", error_message));
return;
case kSessionOpen:
break;
}
state_ = kSessionClosing;
buffered_session_close_ =
BufferedClose{error_code, std::string(error_message)};
OnCanWrite();
}
Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
return nullptr;
}
Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
return nullptr;
}
bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
return false;
}
bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
return false;
}
Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
return nullptr;
}
Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
return nullptr;
}
Stream* EncapsulatedSession::GetStreamById(StreamId /*id*/) { return nullptr; }
DatagramStats EncapsulatedSession::GetDatagramStats() {
DatagramStats stats;
stats.expired_outgoing = 0;
stats.lost_outgoing = 0;
return stats;
}
SessionStats EncapsulatedSession::GetSessionStats() {
// We could potentially get stats via tcp_info and similar mechanisms, but
// that would require us knowing what the underlying socket is.
return SessionStats();
}
void EncapsulatedSession::NotifySessionDraining() {
control_capsule_queue_.push_back(quiche::SerializeCapsule(
quiche::Capsule(quiche::DrainWebTransportSessionCapsule()), allocator_));
OnCanWrite();
}
void EncapsulatedSession::SetOnDraining(
quiche::SingleUseCallback<void()> callback) {
draining_callback_ = std::move(callback);
}
DatagramStatus EncapsulatedSession::SendOrQueueDatagram(
absl::string_view datagram) {
if (datagram.size() > GetMaxDatagramSize()) {
return DatagramStatus{
DatagramStatusCode::kTooBig,
absl::StrCat("Datagram is ", datagram.size(),
" bytes long, while the specified maximum size is ",
GetMaxDatagramSize())};
}
bool write_blocked;
switch (state_) {
case kUninitialized:
write_blocked = true;
break;
// We can send datagrams before receiving any headers from the peer, since
// datagrams are not subject to queueing.
case kWaitingForHeaders:
case kSessionOpen:
write_blocked = !writer_->CanWrite();
break;
case kSessionClosing:
case kSessionClosed:
return DatagramStatus{DatagramStatusCode::kInternalError,
"Writing into an already closed session"};
}
if (write_blocked) {
// TODO: this *may* be useful to split into a separate queue.
control_capsule_queue_.push_back(
quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_));
return DatagramStatus{DatagramStatusCode::kSuccess, ""};
}
// We could always write via OnCanWrite() above, but the optimistic path below
// allows us to avoid a copy.
quiche::QuicheBuffer buffer =
quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_);
std::array spans = {buffer.AsStringView(), datagram};
absl::Status write_status =
writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions());
if (!write_status.ok()) {
OnWriteError(write_status);
return DatagramStatus{
DatagramStatusCode::kInternalError,
absl::StrCat("Write error for datagram: ", write_status.ToString())};
}
return DatagramStatus{DatagramStatusCode::kSuccess, ""};
}
uint64_t EncapsulatedSession::GetMaxDatagramSize() const {
return kEncapsulatedMaxDatagramSize;
}
void EncapsulatedSession::SetDatagramMaxTimeInQueue(
absl::Duration /*max_time_in_queue*/) {
// TODO(b/264263113): implement this (requires having a mockable clock).
}
void EncapsulatedSession::OnCanWrite() {
if (state_ == kUninitialized || !writer_) {
OnFatalError("Trying to write before the session is initialized");
return;
}
if (state_ == kSessionClosed) {
OnFatalError("Trying to write before the session is closed");
return;
}
if (state_ == kSessionClosing) {
if (writer_->CanWrite()) {
CloseWebTransportSessionCapsule capsule{
buffered_session_close_.error_code,
buffered_session_close_.error_message};
quiche::QuicheBuffer buffer =
quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_);
absl::Status write_status = SendFin(buffer.AsStringView());
if (!write_status.ok()) {
OnWriteError(quiche::AppendToStatus(write_status,
" while writing WT_CLOSE_SESSION"));
return;
}
OnSessionClosed(buffered_session_close_.error_code,
buffered_session_close_.error_message);
}
return;
}
while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
absl::Status write_status = quiche::WriteIntoStream(
*writer_, control_capsule_queue_.front().AsStringView());
if (!write_status.ok()) {
OnWriteError(write_status);
return;
}
control_capsule_queue_.pop_front();
}
// TODO(b/264263113): send stream data.
}
void EncapsulatedSession::OnCanRead() {
if (state_ == kSessionClosed || state_ == kSessionClosing) {
return;
}
bool has_fin = quiche::ProcessAllReadableRegions(
*reader_, [&](absl::string_view fragment) {
capsule_parser_.IngestCapsuleFragment(fragment);
});
if (has_fin) {
capsule_parser_.ErrorIfThereIsRemainingBufferedData();
OnSessionClosed(0, "");
}
}
bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
switch (capsule.capsule_type()) {
case CapsuleType::DATAGRAM:
visitor_->OnDatagramReceived(
capsule.datagram_capsule().http_datagram_payload);
break;
case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
if (draining_callback_) {
std::move(draining_callback_)();
}
break;
case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
OnSessionClosed(
capsule.close_web_transport_session_capsule().error_code,
std::string(
capsule.close_web_transport_session_capsule().error_message));
break;
default:
break;
}
return true;
}
void EncapsulatedSession::OnCapsuleParseFailure(
absl::string_view error_message) {
OnFatalError(absl::StrCat("Stream parse error: ", error_message));
}
void EncapsulatedSession::OpenSession() {
state_ = kSessionOpen;
visitor_->OnSessionReady();
OnCanWrite();
OnCanRead();
}
absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
QUICHE_DCHECK(!fin_sent_);
fin_sent_ = true;
quiche::StreamWriteOptions options;
options.set_send_fin(true);
return quiche::WriteIntoStream(*writer_, data, options);
}
void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
const std::string& error_message) {
if (!fin_sent_) {
absl::Status status = SendFin("");
if (!status.ok()) {
OnWriteError(status);
return;
}
}
if (session_close_notified_) {
QUICHE_DCHECK_EQ(state_, kSessionClosed);
return;
}
state_ = kSessionClosed;
session_close_notified_ = true;
if (visitor_ != nullptr) {
visitor_->OnSessionClosed(error_code, error_message);
}
}
void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
<< error_message;
state_ = kSessionClosed;
if (fatal_error_callback_) {
std::move(fatal_error_callback_)(error_message);
}
}
void EncapsulatedSession::OnWriteError(absl::Status error) {
OnFatalError(absl::StrCat(
error, " while trying to write encapsulated WebTransport data"));
}
} // namespace webtransport