Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor our connection code for worker proto like serve proto #10782

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 14 additions & 24 deletions src/libstore/daemon.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "daemon.hh"
#include "signals.hh"
#include "worker-protocol.hh"
#include "worker-protocol-connection.hh"
#include "worker-protocol-impl.hh"
#include "build-result.hh"
#include "store-api.hh"
Expand Down Expand Up @@ -1026,11 +1027,9 @@ void processConnection(
#endif

/* Exchange the greeting. */
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
to << WORKER_MAGIC_2 << PROTOCOL_VERSION;
to.flush();
WorkerProto::Version clientVersion = readInt(from);
WorkerProto::Version clientVersion =
WorkerProto::BasicServerConnection::handshake(
to, from, PROTOCOL_VERSION);

if (clientVersion < 0x10a)
throw Error("the Nix client version is too old");
Expand All @@ -1048,29 +1047,20 @@ void processConnection(
printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount);
});

if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from)) {
// Obsolete CPU affinity.
readInt(from);
}

if (GET_PROTOCOL_MINOR(clientVersion) >= 11)
readInt(from); // obsolete reserveSpace

if (GET_PROTOCOL_MINOR(clientVersion) >= 33)
to << nixVersion;
WorkerProto::BasicServerConnection conn {
.to = to,
.from = from,
.clientVersion = clientVersion,
};

if (GET_PROTOCOL_MINOR(clientVersion) >= 35) {
conn.postHandshake(*store, {
.daemonNixVersion = nixVersion,
// We and the underlying store both need to trust the client for
// it to be trusted.
auto temp = trusted
.remoteTrustsUs = trusted
? store->isTrustedClient()
: std::optional { NotTrusted };
WorkerProto::WriteConn wconn {
.to = to,
.version = clientVersion,
};
WorkerProto::write(*store, wconn, temp);
}
: std::optional { NotTrusted },
});

/* Send startup error messages to the client. */
tunnelLogger->startWork();
Expand Down
52 changes: 25 additions & 27 deletions src/libstore/legacy-ssh-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "pool.hh"
#include "remote-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-connection.hh"
#include "serve-protocol-impl.hh"
#include "build-result.hh"
#include "store-api.hh"
Expand Down Expand Up @@ -86,7 +87,7 @@ ref<LegacySSHStore::Connection> LegacySSHStore::openConnection()
conn->sshConn->in.close();
{
NullSink nullSink;
conn->from.drainInto(nullSink);
tee.drainInto(nullSink);
}
throw Error("'nix-store --serve' protocol mismatch from '%s', got '%s'",
host, chomp(saved.s));
Expand Down Expand Up @@ -168,41 +169,38 @@ void LegacySSHStore::addToStore(const ValidPathInfo & info, Source & source,
}
conn->to.flush();

if (readInt(conn->from) != 1)
throw Error("failed to add path '%s' to remote host '%s'", printStorePath(info.path), host);

} else {

conn->to
<< ServeProto::Command::ImportPaths
<< 1;
try {
copyNAR(source, conn->to);
} catch (...) {
conn->good = false;
throw;
}
conn->to
<< exportMagic
<< printStorePath(info.path);
ServeProto::write(*this, *conn, info.references);
conn->to
<< (info.deriver ? printStorePath(*info.deriver) : "")
<< 0
<< 0;
conn->to.flush();
conn->importPaths(*this, [&](Sink & sink) {
try {
copyNAR(source, sink);
} catch (...) {
conn->good = false;
throw;
}
sink
<< exportMagic
<< printStorePath(info.path);
ServeProto::write(*this, *conn, info.references);
sink
<< (info.deriver ? printStorePath(*info.deriver) : "")
<< 0
<< 0;
});

}

if (readInt(conn->from) != 1)
throw Error("failed to add path '%s' to remote host '%s'", printStorePath(info.path), host);
}


void LegacySSHStore::narFromPath(const StorePath & path, Sink & sink)
{
auto conn(connections->get());

conn->to << ServeProto::Command::DumpStorePath << printStorePath(path);
conn->to.flush();
copyNAR(conn->from, sink);
conn->narFromPath(*this, path, [&](auto & source) {
copyNAR(source, sink);
});
}


Expand All @@ -226,7 +224,7 @@ BuildResult LegacySSHStore::buildDerivation(const StorePath & drvPath, const Bas

conn->putBuildDerivationRequest(*this, drvPath, drv, buildSettings());

return ServeProto::Serialise<BuildResult>::read(*this, *conn);
return conn->getBuildDerivationResponse(*this);
}


Expand Down
82 changes: 3 additions & 79 deletions src/libstore/remote-store-connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "remote-store.hh"
#include "worker-protocol.hh"
#include "worker-protocol-connection.hh"
#include "pool.hh"

namespace nix {
Expand All @@ -14,90 +15,13 @@ namespace nix {
* Contains `Source` and `Sink` for actual communication, along with
* other information learned when negotiating the connection.
*/
struct RemoteStore::Connection
struct RemoteStore::Connection : WorkerProto::BasicClientConnection,
WorkerProto::ClientHandshakeInfo
{
/**
* Send with this.
*/
FdSink to;

/**
* Receive with this.
*/
FdSource from;

/**
* Worker protocol version used for the connection.
*
* Despite its name, I think it is actually the maximum version both
* sides support. (If the maximum doesn't exist, we would fail to
* establish a connection and produce a value of this type.)
*/
WorkerProto::Version daemonVersion;

/**
* Whether the remote side trusts us or not.
*
* 3 values: "yes", "no", or `std::nullopt` for "unknown".
*
* Note that the "remote side" might not be just the end daemon, but
* also an intermediary forwarder that can make its own trusting
* decisions. This would be the intersection of all their trust
* decisions, since it takes only one link in the chain to start
* denying operations.
*/
std::optional<TrustedFlag> remoteTrustsUs;

/**
* The version of the Nix daemon that is processing our requests.
*
* Do note, it may or may not communicating with another daemon,
* rather than being an "end" `LocalStore` or similar.
*/
std::optional<std::string> daemonNixVersion;

/**
* Time this connection was established.
*/
std::chrono::time_point<std::chrono::steady_clock> startTime;

/**
* Coercion to `WorkerProto::ReadConn`. This makes it easy to use the
* factored out worker protocol searlizers with a
* `RemoteStore::Connection`.
*
* The worker protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::ReadConn ()
{
return WorkerProto::ReadConn {
.from = from,
.version = daemonVersion,
};
}

/**
* Coercion to `WorkerProto::WriteConn`. This makes it easy to use the
* factored out worker protocol searlizers with a
* `RemoteStore::Connection`.
*
* The worker protocol connection types are unidirectional, unlike
* this type.
*/
operator WorkerProto::WriteConn ()
{
return WorkerProto::WriteConn {
.to = to,
.version = daemonVersion,
};
}

virtual ~Connection();

virtual void closeWrite() = 0;

std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true);
};

/**
Expand Down
Loading
Loading