Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vHive engine #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/execution/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ libggexecution_a_SOURCES = response.hh response.cc \
engine_lambda.hh engine_lambda.cc \
engine_gg.hh engine_gg.cc \
engine_gcloud.hh engine_gcloud.cc \
engine_vhive.hh engine_vhive.cc \
meow/message.hh meow/message.cc \
meow/util.hh meow/util.cc \
engine_meow.hh engine_meow.cc \
Expand Down
122 changes: 122 additions & 0 deletions src/execution/engine_vhive.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/* -*-mode:c++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */

#include "engine_vhive.hh"

#include <stdexcept>

#include "response.hh"
#include "net/http_response.hh"
#include "thunk/ggutils.hh"
#include "util/optional.hh"
#include "util/system_runner.hh"
#include "util/units.hh"
#include "util/base64.hh"

using namespace std;
using namespace gg;
using namespace gg::thunk;

VHiveExecutionEngine::VHiveExecutionEngine( const size_t max_jobs, const string & url )
: ExecutionEngine( max_jobs ), parsed_url_( url ), address_( parsed_url_.host, parsed_url_.protocol ),
final_url_( parsed_url_.host )
{
// If a port is specified, we treat the url and address differently
uint16_t port = parsed_url_.port.get();

if ( port ) {
string port_string = to_string( port );
final_url_ = parsed_url_.host + ":" + port_string;
address_ = Address( parsed_url_.host, port_string );
}
}

HTTPRequest VHiveExecutionEngine::generate_request( const Thunk & thunk )
{
string payload = Thunk::execution_payload( thunk );
HTTPRequest request;
request.set_first_line( "POST / HTTP/1.1" );
request.add_header( HTTPHeader{ "Host", final_url_ } );
request.add_header( HTTPHeader{ "Content-Length", to_string( payload.size() ) } );
request.add_header( HTTPHeader{ "Content-Type", "application/json" } );
request.add_header( HTTPHeader{ "Accept", "*/*" } );
request.done_with_headers();

request.read_in_body( payload );
assert( request.state() == COMPLETE );

return request;
}

void VHiveExecutionEngine::force_thunk( const Thunk & thunk,
ExecutionLoop & exec_loop )
{
HTTPRequest request = generate_request( thunk );
exec_loop.make_http_request<TCPConnection>( thunk.hash(),
address_, request,
[this] ( const uint64_t, const string & thunk_hash,
const HTTPResponse & http_response ) -> bool
{
running_jobs_--;

if ( http_response.status_code() != "200" ) {
failure_callback_( thunk_hash, JobStatus::InvocationFailure );
return false;
}

ExecutionResponse response = ExecutionResponse::parse_message( http_response.body() );

/* print the output, if there's any */
if ( response.stdout.length() ) {
cerr << response.stdout << endl;
}

switch ( response.status ) {
case JobStatus::Success:
{
if ( response.thunk_hash != thunk_hash ) {
cerr << http_response.str() << endl;
throw runtime_error( "expected output for " +
thunk_hash + ", got output for " +
response.thunk_hash );
}

for ( const auto & output : response.outputs ) {
gg::cache::insert( gg::hash::for_output( response.thunk_hash, output.tag ), output.hash );

if ( output.data.length() ) {
roost::atomic_create( base64::decode( output.data ),
gg::paths::blob( output.hash ) );
}
}

gg::cache::insert( response.thunk_hash, response.outputs.at( 0 ).hash );

vector<ThunkOutput> thunk_outputs;
for ( auto & output : response.outputs ) {
thunk_outputs.emplace_back( move( output.hash ), move( output.tag ) );
}

success_callback_( response.thunk_hash, move( thunk_outputs ), 0 );

break;
}

default: /* in case of any other failure */
failure_callback_( thunk_hash, response.status );
}

return false;
},
[this] ( const uint64_t, const string & thunk_hash )
{
failure_callback_( thunk_hash, JobStatus::SocketFailure );
}
);

running_jobs_++;
}

size_t VHiveExecutionEngine::job_count() const
{
return running_jobs_;
}
34 changes: 34 additions & 0 deletions src/execution/engine_vhive.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* -*-mode:c++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */

#ifndef ENGINE_VHIVE_HH
#define ENGINE_VHIVE_HH

#include "engine.hh"
#include "net/http_request.hh"
#include "thunk/thunk.hh"
#include "util/uri.hh"

class VHiveExecutionEngine : public ExecutionEngine
{
private:
ParsedURI parsed_url_;
Address address_;
std::string final_url_;

size_t running_jobs_ { 0 };

HTTPRequest generate_request( const gg::thunk::Thunk & thunk );

public:
VHiveExecutionEngine( const size_t max_jobs, const std::string & address );

void force_thunk( const gg::thunk::Thunk & thunk,
ExecutionLoop & exec_loop ) override;
size_t job_count() const override;

bool is_remote() const { return true; }
std::string label() const override { return "remote"; }
bool can_execute( const gg::thunk::Thunk & ) const { return true; }
};

#endif /* ENGINE_VHIVE_HH */
7 changes: 6 additions & 1 deletion src/frontend/gg-force.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "execution/engine_gg.hh"
#include "execution/engine_meow.hh"
#include "execution/engine_gcloud.hh"
#include "execution/engine_vhive.hh"
#include "tui/status_bar.hh"
#include "util/digest.hh"
#include "util/exception.hh"
Expand Down Expand Up @@ -58,6 +59,7 @@ void usage( const char * argv0 )
<< " - remote Executes the jobs on a remote machine" << endl
<< " - meow Executes the jobs on AWS Lambda with long-running workers" << endl
<< " - gcloud Executes the jobs on Google Cloud Functions" << endl
<< " - vhive Executes the jobs on VHive Server" << endl
<< endl
<< "Environment variables:" << endl
<< " - " << FORCE_NO_STATUS << endl
Expand Down Expand Up @@ -100,7 +102,6 @@ unique_ptr<ExecutionEngine> make_execution_engine( const EngineInfo & engine )
const string & engine_name = get<0>( engine );
const string & engine_params = get<1>( engine );
const size_t max_jobs = get<2>( engine );

if ( engine_name == "local" ) {
const bool mixed = (engine_params == "mixed");
return make_unique<LocalExecutionEngine>( mixed, max_jobs );
Expand Down Expand Up @@ -144,6 +145,10 @@ unique_ptr<ExecutionEngine> make_execution_engine( const EngineInfo & engine )
return make_unique<GCFExecutionEngine>( max_jobs,
safe_getenv("GG_GCLOUD_FUNCTION") );
}
else if ( engine_name == "vhive" ) {
string url = engine_params;
return make_unique<VHiveExecutionEngine>( max_jobs, url );
}
else {
throw runtime_error( "unknown execution engine" );
}
Expand Down