Skip to content

Commit

Permalink
Fix tensorflow_serving build when building against a recent hash of T…
Browse files Browse the repository at this point in the history
…ensorFlow

PiperOrigin-RevId: 300687304
  • Loading branch information
ebrevdo authored and tensorflow-copybara committed Mar 13, 2020
1 parent 726ca19 commit 3d5b44a
Show file tree
Hide file tree
Showing 24 changed files with 157 additions and 226 deletions.
33 changes: 25 additions & 8 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ workspace(name = "tf_serving")
# 3. Request the new archive to be mirrored on mirror.bazel.build for more
# reliable downloads.
load("//tensorflow_serving:repo.bzl", "tensorflow_http_archive")

tensorflow_http_archive(
name = "org_tensorflow",
sha256 = "3513fd2e31a9297452a257e687b88a4c9b44b983880f67f8469b5c6a62bec1d2",
git_commit = "7cf58aa514b348b0b44610555dd8e3002c32e999",
patch = "//third_party/tf_patch:tf.patch"
sha256 = "788bb65c12a79fd02746d614bd87485738d9e4c1aed79fde52ffea434e1c1050",
git_commit = "553bff39bbec23e0d7b792fd8b4383014ad0401f",
)

load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
Expand All @@ -24,11 +24,13 @@ http_archive(
)

load("@rules_pkg//:deps.bzl", "rules_pkg_dependencies")
rules_pkg_dependencies()

rules_pkg_dependencies()

load("@org_tensorflow//third_party/toolchains/preconfig/generate:archives.bzl",
"bazel_toolchains_archive")
load(
"@org_tensorflow//third_party/toolchains/preconfig/generate:archives.bzl",
"bazel_toolchains_archive",
)

bazel_toolchains_archive()

Expand All @@ -53,15 +55,30 @@ http_archive(
)
http_archive(
name = "bazel_skylib",
sha256 = "2ef429f5d7ce7111263289644d233707dba35e39696377ebab8b0bc701f7818e",
urls = ["https://github.com/bazelbuild/bazel-skylib/releases/download/0.8.0/bazel-skylib.0.8.0.tar.gz"],
sha256 = "1dde365491125a3db70731e25658dfdd3bc5dbdfd11b840b3e987ecf043c7ca0",
urls = [
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/bazelbuild/bazel-skylib/releases/download/0.9.0/bazel_skylib-0.9.0.tar.gz",
"https://github.com/bazelbuild/bazel-skylib/releases/download/0.9.0/bazel_skylib-0.9.0.tar.gz",
],
) # https://github.com/bazelbuild/bazel-skylib/releases

# END: Upstream TensorFlow dependencies

# Please add all new TensorFlow Serving dependencies in workspace.bzl.
load("//tensorflow_serving:workspace.bzl", "tf_serving_workspace")

tf_serving_workspace()

# Specify the minimum required bazel version.
load("@org_tensorflow//tensorflow:version_check.bzl", "check_bazel_version_at_least")

check_bazel_version_at_least("2.0.0")

# GPRC deps, required to match TF's. Only after calling tf_serving_workspace()
load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")

grpc_deps()

load("@upb//bazel:repository_defs.bzl", "bazel_version_repository")

bazel_version_repository(name = "bazel_version")
15 changes: 8 additions & 7 deletions tensorflow_serving/batching/streaming_batch_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class StreamingBatchScheduler : public BatchScheduler<TaskType> {

// Closes 'open_batch_' (unless it equals nullptr), and replaces it with a
// fresh open batch. Schedules the new batch on 'batch_threads_'.
void StartNewBatch() EXCLUSIVE_LOCKS_REQUIRED(mu_);
void StartNewBatch() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Takes a snapshot of 'open_batch_num_', and schedules an event with
// 'batch_closer_' to close it at time 'close_time_micros' if it is still open
// at that time.
void ScheduleCloseOfCurrentOpenBatch(uint64 close_time_micros)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

const Options options_;

Expand All @@ -206,21 +206,22 @@ class StreamingBatchScheduler : public BatchScheduler<TaskType> {

// The batch that is currently open and into which new tasks can be added.
// Not owned here; owned by the batch thread pool.
Batch<TaskType>* open_batch_ GUARDED_BY(mu_) = nullptr;
Batch<TaskType>* open_batch_ TF_GUARDED_BY(mu_) = nullptr;

// The sequence number of 'open_batch_'. Incremented each time 'open_batch_'
// is assigned to a new (non-null) batch object.
int64 open_batch_num_ GUARDED_BY(mu_) = 0;
int64 open_batch_num_ TF_GUARDED_BY(mu_) = 0;

// The number of batches "in progress", i.e. batches that have been started
// but for which the process-batch callback hasn't finished. Note that this
// counter is somewhat conservative (i.e. might be an overestimate), because
// it gets decremented after the callback finishes and there could be races.
int num_batches_in_progress_ GUARDED_BY(mu_) = 0;
int num_batches_in_progress_ TF_GUARDED_BY(mu_) = 0;

// A background task we use to schedule batches to close when they hit their
// timeout.
std::unique_ptr<internal::SingleTaskScheduler> batch_closer_ GUARDED_BY(mu_);
std::unique_ptr<internal::SingleTaskScheduler> batch_closer_
TF_GUARDED_BY(mu_);

TF_DISALLOW_COPY_AND_ASSIGN(StreamingBatchScheduler);
};
Expand Down Expand Up @@ -276,7 +277,7 @@ class SingleTaskScheduler {
};

// A newly-scheduled task hasn't yet been picked up by 'thread_'.
optional<Task> updated_task_ GUARDED_BY(mu_);
optional<Task> updated_task_ TF_GUARDED_BY(mu_);

// The time parameter passed in the most recent Schedule() invocation.
// Used to enforce monotonicity.
Expand Down
20 changes: 10 additions & 10 deletions tensorflow_serving/core/aspired_versions_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,49 +230,49 @@ class AspiredVersionsManager : public Manager,
void EnqueueAspiredVersionsRequest(
const StringPiece servable_name,
std::vector<ServableData<std::unique_ptr<Loader>>> versions)
LOCKS_EXCLUDED(pending_aspired_versions_requests_mu_);
TF_LOCKS_EXCLUDED(pending_aspired_versions_requests_mu_);

// Processes an aspired-versions request. It assumes the request doesn't
// re-aspire any servables currently marked as not aspired in
// 'basic_manager_'.
void ProcessAspiredVersionsRequest(
const StringPiece servable_name,
std::vector<ServableData<std::unique_ptr<Loader>>> versions)
EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);

// Determines whether an aspired-versions request contains any versions that
// are currently being managed in 'basic_manager_' with is_aspired==false.
bool ContainsAnyReaspiredVersions(
const StringPiece servable_name,
const std::vector<ServableData<std::unique_ptr<Loader>>>& versions) const
SHARED_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
TF_SHARED_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);

// Performs the action on the harness.
void PerformAction(const AspiredVersionPolicy::ServableAction action)
EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);

// Goes through the harness map and calls the configured servable_policy with
// the state snapshots to get a list of suggested actions. The actions are
// then ordered and finally the topmost one is performed.
optional<AspiredVersionPolicy::ServableAction> GetNextAction()
EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(basic_manager_read_modify_write_mu_);

// Checks for servables that are not aspired and at some final state and tells
// 'basic_manager_' to forget about them. This method is intended to be
// invoked periodically, interleaved with InvokePolicyAndExecuteAction() and
// HandlePendingAspiredVersionsRequests().
void FlushServables() LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);
void FlushServables() TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);

// Handles enqueued aspired-versions requests. This method is intended to be
// invoked periodically, interleaved with InvokePolicyAndExecuteAction().
void HandlePendingAspiredVersionsRequests()
LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_,
pending_aspired_versions_requests_mu_);
TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_,
pending_aspired_versions_requests_mu_);

// Invokes the aspired-version policy and executes any returned policy action.
// This method is intended to be invoked periodically.
void InvokePolicyAndExecuteAction()
LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);
TF_LOCKS_EXCLUDED(basic_manager_read_modify_write_mu_);

// Sets the number of load threads.
//
Expand All @@ -294,7 +294,7 @@ class AspiredVersionsManager : public Manager,
using AspiredVersionsMap =
std::map<string, std::vector<ServableData<std::unique_ptr<Loader>>>>;
AspiredVersionsMap pending_aspired_versions_requests_
GUARDED_BY(pending_aspired_versions_requests_mu_);
TF_GUARDED_BY(pending_aspired_versions_requests_mu_);
mutable mutex pending_aspired_versions_requests_mu_;

// To lock basic_manager_ to perform atomic read/modify/write operations on
Expand Down
20 changes: 10 additions & 10 deletions tensorflow_serving/core/basic_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,16 +718,16 @@ Status BasicManager::ReserveResources(LoaderHarness* harness,
bool resources_reserved;
// We retry reserving resources because it may involve transiently failing
// operations like file-reads.
const Status reserve_resources_status =
Retry(strings::StrCat("Reserving resources for servable: ",
harness->id().DebugString()),
harness_options_.max_num_load_retries,
harness_options_.load_retry_interval_micros,
[&]() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return resource_tracker_->ReserveResources(*harness->loader(),
&resources_reserved);
},
[&]() { return harness->cancel_load_retry(); });
const Status reserve_resources_status = Retry(
strings::StrCat("Reserving resources for servable: ",
harness->id().DebugString()),
harness_options_.max_num_load_retries,
harness_options_.load_retry_interval_micros,
[&]() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return resource_tracker_->ReserveResources(*harness->loader(),
&resources_reserved);
},
[&]() { return harness->cancel_load_retry(); });
if (!reserve_resources_status.ok()) {
return errors::Internal(strings::StrCat(
"Error while attempting to reserve resources to load servable ",
Expand Down
36 changes: 18 additions & 18 deletions tensorflow_serving/core/basic_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ class BasicManager : public Manager {
// status if a corresponding harness was found, else an error status.
Status GetHealthyHarness(const ServableId& servable_id,
LoaderHarness** harness)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Obtains a pointer to every managed loader that is currently holding
// resources, i.e. whose state is one of kApprovedForLoading, kLoading,
// kReady, kUnloadRequested, kQuiescing, kQuiesced or kUnloading.
std::vector<const Loader*> GetLoadersCurrentlyUsingResources() const
EXCLUSIVE_LOCKS_REQUIRED(mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// A load or unload request for a particular servable. Facilitates code
// sharing across the two cases.
Expand All @@ -311,13 +311,13 @@ class BasicManager : public Manager {

// A unification of LoadServable() and UnloadServable().
void LoadOrUnloadServable(const LoadOrUnloadRequest& request,
DoneCallback done_callback) LOCKS_EXCLUDED(mu_);
DoneCallback done_callback) TF_LOCKS_EXCLUDED(mu_);

// The synchronous logic for handling a load/unload request, including both
// the decision and execution phases. This is the method run in the executor.
void HandleLoadOrUnloadRequest(const LoadOrUnloadRequest& request,
DoneCallback done_callback)
LOCKS_EXCLUDED(mu_);
TF_LOCKS_EXCLUDED(mu_);

// The decision phase of whether to approve a load/unload request. Delegates
// to one of ApproveLoad() or ApproveUnload() -- see those methods' comments
Expand All @@ -332,7 +332,7 @@ class BasicManager : public Manager {
// precludes concurrent execution of another request that could delete the
// harness.)
Status ApproveLoadOrUnload(const LoadOrUnloadRequest& request,
LoaderHarness** harness) LOCKS_EXCLUDED(mu_);
LoaderHarness** harness) TF_LOCKS_EXCLUDED(mu_);

// The decision phase of whether to approve a load request.
//
Expand All @@ -343,12 +343,12 @@ class BasicManager : public Manager {
// Argument 'mu_lock' is a lock held on 'mu_'. It is released temporarily via
// 'num_ongoing_load_unload_executions_cv_'.
Status ApproveLoad(LoaderHarness* harness, mutex_lock* mu_lock)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// The decision phase of whether to approve an unload request. If it succeeds,
// places the servable into state kQuiescing. Among other things, that
// prevents a subsequent unload request from proceeding concurrently.
Status ApproveUnload(LoaderHarness* harness) EXCLUSIVE_LOCKS_REQUIRED(mu_);
Status ApproveUnload(LoaderHarness* harness) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Attempts to reserve the resources required to load the servable in
// 'harness'. Does not make any state transitions on 'harness' -- merely
Expand All @@ -358,7 +358,7 @@ class BasicManager : public Manager {
// Argument 'mu_lock' is a lock held on 'mu_'. It is released temporarily via
// 'num_ongoing_load_unload_executions_cv_'.
Status ReserveResources(LoaderHarness* harness, mutex_lock* mu_lock)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// The execution phase of loading/unloading a servable. Delegates to either
// ExecuteLoad() or ExecuteUnload().
Expand All @@ -369,17 +369,17 @@ class BasicManager : public Manager {
LoaderHarness* harness);

// The execution phase of loading a servable.
Status ExecuteLoad(LoaderHarness* harness) LOCKS_EXCLUDED(mu_);
Status ExecuteLoad(LoaderHarness* harness) TF_LOCKS_EXCLUDED(mu_);

// The execution phase of loading a unservable.
Status ExecuteUnload(LoaderHarness* harness) LOCKS_EXCLUDED(mu_);
Status ExecuteUnload(LoaderHarness* harness) TF_LOCKS_EXCLUDED(mu_);

// Unloads all the managed servables.
Status UnloadAllServables() LOCKS_EXCLUDED(mu_);
Status UnloadAllServables() TF_LOCKS_EXCLUDED(mu_);

// Updates the serving map by copying servables from the managed map, which
// are ready to be served.
void UpdateServingMap() EXCLUSIVE_LOCKS_REQUIRED(mu_);
void UpdateServingMap() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Sets the number of load threads.
//
Expand All @@ -388,7 +388,7 @@ class BasicManager : public Manager {
// the old thread pool blocks until all threads are done, so it could block
// for a long time.
void SetNumLoadThreads(uint32 num_load_threads)
LOCKS_EXCLUDED(load_executor_mu_);
TF_LOCKS_EXCLUDED(load_executor_mu_);
uint32 num_load_threads() const;

// Keys are the servable names.
Expand All @@ -400,7 +400,7 @@ class BasicManager : public Manager {
// Fetches the harness with this id from the harness_map_. Returns
// harness_map_.end(), if the harness is not found.
ManagedMap::iterator FindHarnessInMap(const ServableId& id)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Publishes the state on the event bus, if an event bus was part of the
// options, if not we ignore it.
Expand All @@ -418,7 +418,7 @@ class BasicManager : public Manager {

// ManagedMap contains all the servables managed by this manager, in different
// states.
ManagedMap managed_map_ GUARDED_BY(mu_);
ManagedMap managed_map_ TF_GUARDED_BY(mu_);

// ServingMap contains all the servables which are ready to be served, which
// is a subset of those in the managed map.
Expand Down Expand Up @@ -489,7 +489,7 @@ class BasicManager : public Manager {
const bool flush_filesystem_caches_ = false;
// The executor (and associated mutex) used for executing loads of servables.
mutable mutex load_executor_mu_;
std::unique_ptr<Executor> load_executor_ GUARDED_BY(load_executor_mu_);
std::unique_ptr<Executor> load_executor_ TF_GUARDED_BY(load_executor_mu_);

// The executor used for executing unloads of servables. (Unlike for loads,
// the unload executor is fixed for the lifetime of the manager.)
Expand All @@ -500,10 +500,10 @@ class BasicManager : public Manager {

// A module that keeps track of available, used and reserved servable
// resources (e.g. RAM).
std::unique_ptr<ResourceTracker> resource_tracker_ GUARDED_BY(mu_);
std::unique_ptr<ResourceTracker> resource_tracker_ TF_GUARDED_BY(mu_);

// The number of load/unload requests currently in their execution phase.
int num_ongoing_load_unload_executions_ GUARDED_BY(mu_) = 0;
int num_ongoing_load_unload_executions_ TF_GUARDED_BY(mu_) = 0;

// Used to wake up threads that are waiting for 'num_ongoing_executions' to
// decrease.
Expand Down
6 changes: 3 additions & 3 deletions tensorflow_serving/core/caching_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ class CachingManager : public Manager {
// basic-manager. All other requests block until the load completes and then
// trivially succeed.
Status LoadServable(ServableData<std::unique_ptr<Loader>> loader_data)
LOCKS_EXCLUDED(load_mutex_map_mu_);
TF_LOCKS_EXCLUDED(load_mutex_map_mu_);

// Returns the size of the load_mutex_map_.
int64 GetLoadMutexMapSize() const LOCKS_EXCLUDED(load_mutex_map_mu_);
int64 GetLoadMutexMapSize() const TF_LOCKS_EXCLUDED(load_mutex_map_mu_);

// Erases the entry from the map corresponding to the servable-id if there is
// only one remaining reference to the mutex.
Expand All @@ -156,7 +156,7 @@ class CachingManager : public Manager {
// a shared_ptr to allow for reference counting and consequent garbage
// collection.
std::map<ServableId, std::shared_ptr<mutex>> load_mutex_map_
GUARDED_BY(load_mutex_map_mu_);
TF_GUARDED_BY(load_mutex_map_mu_);

TF_DISALLOW_COPY_AND_ASSIGN(CachingManager);
};
Expand Down
6 changes: 3 additions & 3 deletions tensorflow_serving/core/caching_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ class StringLoaderFactory : public CachingManager::LoaderFactory {
mutable mutex mu_;

// The current earliest version.
int64 earliest_version_ GUARDED_BY(mu_) = 0;
int64 earliest_version_ TF_GUARDED_BY(mu_) = 0;

// The current latest version.
int64 latest_version_ GUARDED_BY(mu_) = 0;
int64 latest_version_ TF_GUARDED_BY(mu_) = 0;

// Tracks the number of loaders dispensed by the loader-factory.
int64 num_loaders_dispensed_ GUARDED_BY(mu_) = 0;
int64 num_loaders_dispensed_ TF_GUARDED_BY(mu_) = 0;

TF_DISALLOW_COPY_AND_ASSIGN(StringLoaderFactory);
};
Expand Down
2 changes: 1 addition & 1 deletion tensorflow_serving/core/dynamic_source_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class DynamicSourceRouter final : public SourceRouter<T> {
const int num_output_ports_;

mutable mutex routes_mu_;
Routes routes_ GUARDED_BY(routes_mu_);
Routes routes_ TF_GUARDED_BY(routes_mu_);

TF_DISALLOW_COPY_AND_ASSIGN(DynamicSourceRouter);
};
Expand Down
Loading

0 comments on commit 3d5b44a

Please sign in to comment.