Skip to content

Commit

Permalink
quic: rework TLSContext, additional cleanups
Browse files Browse the repository at this point in the history
PR-URL: #51340
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Franziska Hinkelmann <[email protected]>
Reviewed-By: Stephen Belanger <[email protected]>
  • Loading branch information
jasnell authored and aduh95 committed Apr 29, 2024
1 parent 5b52a48 commit fb4edf7
Show file tree
Hide file tree
Showing 28 changed files with 1,255 additions and 910 deletions.
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@
'test/cctest/test_node_crypto.cc',
'test/cctest/test_node_crypto_env.cc',
'test/cctest/test_quic_cid.cc',
'test/cctest/test_quic_error.cc',
'test/cctest/test_quic_tokens.cc',
],
'node_cctest_inspector_sources': [
Expand Down
6 changes: 3 additions & 3 deletions src/crypto/crypto_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static const char system_cert_path[] = NODE_OPENSSL_SYSTEM_CERT_PATH;

static bool extra_root_certs_loaded = false;

inline X509_STORE* GetOrCreateRootCertStore() {
X509_STORE* GetOrCreateRootCertStore() {
// Guaranteed thread-safe by standard, just don't use -fno-threadsafe-statics.
static X509_STORE* store = NewRootCertStore();
return store;
Expand Down Expand Up @@ -140,6 +140,8 @@ int SSL_CTX_use_certificate_chain(SSL_CTX* ctx,
return ret;
}

} // namespace

// Read a file that contains our certificate in "PEM" format,
// possibly followed by a sequence of CA certificates that should be
// sent to the peer in the Certificate message.
Expand Down Expand Up @@ -194,8 +196,6 @@ int SSL_CTX_use_certificate_chain(SSL_CTX* ctx,
issuer);
}

} // namespace

X509_STORE* NewRootCertStore() {
static std::vector<X509*> root_certs_vector;
static Mutex root_certs_vector_mutex;
Expand Down
7 changes: 7 additions & 0 deletions src/crypto/crypto_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ void IsExtraRootCertsFileLoaded(

X509_STORE* NewRootCertStore();

X509_STORE* GetOrCreateRootCertStore();

BIOPointer LoadBIO(Environment* env, v8::Local<v8::Value> v);

class SecureContext final : public BaseObject {
Expand Down Expand Up @@ -153,6 +155,11 @@ class SecureContext final : public BaseObject {
unsigned char ticket_key_hmac_[16];
};

int SSL_CTX_use_certificate_chain(SSL_CTX* ctx,
BIOPointer&& in,
X509Pointer* cert,
X509Pointer* issuer);

} // namespace crypto
} // namespace node

Expand Down
240 changes: 151 additions & 89 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "application.h"
#include <async_wrap-inl.h>
#include <debug_utils-inl.h>
#include <ngtcp2/ngtcp2.h>
#include <node_bob.h>
#include <node_sockaddr-inl.h>
#include <uv.h>
Expand Down Expand Up @@ -95,6 +96,20 @@ Maybe<Session::Application_Options> Session::Application_Options::From(
return Just<Application_Options>(options);
}

// ============================================================================

std::string Session::Application::StreamData::ToString() const {
DebugIndentScope indent;
auto prefix = indent.Prefix();
std::string res("{");
res += prefix + "count: " + std::to_string(count);
res += prefix + "remaining: " + std::to_string(remaining);
res += prefix + "id: " + std::to_string(id);
res += prefix + "fin: " + std::to_string(fin);
res += indent.Close();
return res;
}

Session::Application::Application(Session* session, const Options& options)
: session_(session) {}

Expand Down Expand Up @@ -189,7 +204,7 @@ Packet* Session::Application::CreateStreamDataPacket() {
return Packet::Create(env(),
session_->endpoint_.get(),
session_->remote_address_,
ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
session_->max_packet_size(),
"stream data");
}

Expand Down Expand Up @@ -221,141 +236,188 @@ void Session::Application::StreamReset(Stream* stream,
}

void Session::Application::SendPendingData() {
static constexpr size_t kMaxPackets = 32;
Debug(session_, "Application sending pending data");
PathStorage path;
StreamData stream_data;

Packet* packet = nullptr;
uint8_t* pos = nullptr;
int err = 0;
// The maximum size of packet to create.
const size_t max_packet_size = session_->max_packet_size();

size_t maxPacketCount = std::min(static_cast<size_t>(64000),
ngtcp2_conn_get_send_quantum(*session_));
size_t packetSendCount = 0;
// The maximum number of packets to send in this call to SendPendingData.
const size_t max_packet_count = std::min(
kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size);

const auto updateTimer = [&] {
Debug(session_, "Application updating the session timer");
ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime());
session_->UpdateTimer();
};
// The number of packets that have been sent in this call to SendPendingData.
size_t packet_send_count = 0;

const auto congestionLimited = [&](auto packet) {
auto len = pos - ngtcp2_vec(*packet).base;
// We are either congestion limited or done.
if (len) {
// Some data was serialized into the packet. We need to send it.
packet->Truncate(len);
session_->Send(std::move(packet), path);
}
Packet* packet = nullptr;
uint8_t* pos = nullptr;
uint8_t* begin = nullptr;

updateTimer();
auto ensure_packet = [&] {
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (packet == nullptr) return false;
pos = begin = ngtcp2_vec(*packet).base;
}
DCHECK_NOT_NULL(packet);
DCHECK_NOT_NULL(pos);
DCHECK_NOT_NULL(begin);
return true;
};

// We're going to enter a loop here to prepare and send no more than
// max_packet_count packets.
for (;;) {
ssize_t ndatalen;
StreamData stream_data;

err = GetStreamData(&stream_data);
// ndatalen is the amount of stream data that was accepted into the packet.
ssize_t ndatalen = 0;

if (err < 0) {
// Make sure we have a packet to write data into.
if (!ensure_packet()) {
Debug(session_, "Failed to create packet for stream data");
// Doh! Could not create a packet. Time to bail.
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}

if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (packet == nullptr) {
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}
pos = ngtcp2_vec(*packet).base;
// The stream_data is the next block of data from the application stream.
if (GetStreamData(&stream_data) < 0) {
Debug(session_, "Application failed to get stream data");
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
packet->Done(UV_ECANCELED);
return session_->Close(Session::CloseMethod::SILENT);
}

ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data);
// If we got here, we were at least successful in checking for stream data.
// There might not be any stream data to send.
Debug(session_, "Application using stream data: %s", stream_data);

// Awesome, let's write our packet!
ssize_t nwrite =
WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data);
Debug(session_, "Application accepted %zu bytes into packet", ndatalen);

if (nwrite <= 0) {
// A negative nwrite value indicates either an error or that there is more
// data to write into the packet.
if (nwrite < 0) {
switch (nwrite) {
case 0:
if (stream_data.id >= 0) ResumeStream(stream_data.id);
return congestionLimited(std::move(packet));
case NGTCP2_ERR_STREAM_DATA_BLOCKED: {
session().StreamDataBlocked(stream_data.id);
if (session().max_data_left() == 0) {
if (stream_data.id >= 0) ResumeStream(stream_data.id);
return congestionLimited(std::move(packet));
}
CHECK_LE(ndatalen, 0);
// We could not write any data for this stream into the packet because
// the flow control for the stream itself indicates that the stream
// is blocked. We'll skip and move on to the next stream.
// ndatalen = -1 means that no stream data was accepted into the
// packet, which is what we want here.
DCHECK_EQ(ndatalen, -1);
DCHECK(stream_data.stream);
session_->StreamDataBlocked(stream_data.id);
continue;
}
case NGTCP2_ERR_STREAM_SHUT_WR: {
// Indicates that the writable side of the stream has been closed
// Indicates that the writable side of the stream should be closed
// locally or the stream is being reset. In either case, we can't send
// any stream data!
CHECK_GE(stream_data.id, 0);
// We need to notify the stream that the writable side has been closed
// and no more outbound data can be sent.
CHECK_LE(ndatalen, 0);
auto stream = session_->FindStream(stream_data.id);
if (stream) stream->EndWritable();
Debug(session_,
"Stream %" PRIi64 " should be closed for writing",
stream_data.id);
// ndatalen = -1 means that no stream data was accepted into the
// packet, which is what we want here.
DCHECK_EQ(ndatalen, -1);
DCHECK(stream_data.stream);
stream_data.stream->EndWritable();
continue;
}
case NGTCP2_ERR_WRITE_MORE: {
CHECK_GT(ndatalen, 0);
if (!StreamCommit(&stream_data, ndatalen)) return session_->Close();
pos += ndatalen;
// This return value indicates that we should call into WriteVStream
// again to write more data into the same packet.
Debug(session_, "Application should write more to packet");
DCHECK_GE(ndatalen, 0);
if (!StreamCommit(&stream_data, ndatalen)) {
packet->Done(UV_ECANCELED);
return session_->Close(CloseMethod::SILENT);
}
continue;
}
}

packet->Done(UV_ECANCELED);
session_->last_error_ = QuicError::ForNgtcp2Error(nwrite);
return session_->Close(Session::CloseMethod::SILENT);
}

pos += nwrite;
if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) {
// Since we are closing the session here, we don't worry about updating
// the pkt tx time. The failed StreamCommit should have updated the
// last_error_ appropriately.
// Some other type of error happened.
DCHECK_EQ(ndatalen, -1);
Debug(session_,
"Application encountered error while writing packet: %s",
ngtcp2_strerror(nwrite));
session_->SetLastError(QuicError::ForNgtcp2Error(nwrite));
packet->Done(UV_ECANCELED);
return session_->Close(Session::CloseMethod::SILENT);
} else if (ndatalen >= 0) {
// We wrote some data into the packet. We need to update the flow control
// by committing the data.
if (!StreamCommit(&stream_data, ndatalen)) {
packet->Done(UV_ECANCELED);
return session_->Close(CloseMethod::SILENT);
}
}

if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id);
// When nwrite is zero, it means we are congestion limited.
// We should stop trying to send additional packets.
if (nwrite == 0) {
Debug(session_, "Congestion limited.");
// There might be a partial packet already prepared. If so, send it.
size_t datalen = pos - begin;
if (datalen) {
Debug(session_, "Packet has %zu bytes to send", datalen);
// At least some data had been written into the packet. We should send
// it.
packet->Truncate(datalen);
session_->Send(packet, path);
} else {
packet->Done(UV_ECANCELED);
}

packet->Truncate(nwrite);
session_->Send(std::move(packet), path);
// If there was stream data selected, we should reschedule it to try
// sending again.
if (stream_data.id >= 0) ResumeStream(stream_data.id);

pos = nullptr;
return session_->UpdatePacketTxTime();
}

if (++packetSendCount == maxPacketCount) {
break;
// At this point we have a packet prepared to send.
pos += nwrite;
size_t datalen = pos - begin;
Debug(session_, "Sending packet with %zu bytes", datalen);
packet->Truncate(datalen);
session_->Send(packet, path);

// If we have sent the maximum number of packets, we're done.
if (++packet_send_count == max_packet_count) {
return session_->UpdatePacketTxTime();
}
}

updateTimer();
// Prepare to loop back around to prepare a new packet.
packet = nullptr;
pos = begin = nullptr;
}
}

ssize_t Session::Application::WriteVStream(PathStorage* path,
uint8_t* buf,
uint8_t* dest,
ssize_t* ndatalen,
size_t max_packet_size,
const StreamData& stream_data) {
CHECK_LE(stream_data.count, kMaxVectorCount);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE;
if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE;
DCHECK_LE(stream_data.count, kMaxVectorCount);
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
ssize_t ret = ngtcp2_conn_writev_stream(
*session_,
&path->path,
nullptr,
buf,
ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
ndatalen,
flags,
stream_data.id,
stream_data.buf,
stream_data.count,
uv_hrtime());
return ret;
ngtcp2_pkt_info pi;
return ngtcp2_conn_writev_stream(*session_,
&path->path,
&pi,
dest,
max_packet_size,
ndatalen,
flags,
stream_data.id,
stream_data.buf,
stream_data.count,
uv_hrtime());
}

// The DefaultApplication is the default implementation of Session::Application
Expand Down
Loading

0 comments on commit fb4edf7

Please sign in to comment.