Skip to content

Commit

Permalink
use deadline & test pubsub/index
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jan 29, 2016
1 parent b2d163b commit 9cf1626
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 97 deletions.
20 changes: 17 additions & 3 deletions lib/common/grpc-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ nodeutil.inherits(GrpcService, Service);
* @param {object} protoOpts - The proto options.
* @param {string} protoOpts.service - The service name.
* @param {string} protoOpts.method - The method name.
* @param {number=} protoOpts.timeout - After how many milliseconds should the
* request cancel.
* @param {object} reqOpts - The request options.
* @param {function=} callback - The callback function.
*/
Expand Down Expand Up @@ -185,11 +187,22 @@ GrpcService.prototype.request = function(protoOpts, reqOpts, callback) {
return;
}

var grpcOpts = {};

if (is.number(protoOpts.timeout)) {
grpcOpts.deadline = new Date(Date.now() + protoOpts.timeout);
}

var service = new proto[protoOpts.service](
this.baseUrl,
this.grpcCredentials
);

// snakeize and camelize are used to transform camelCase request options to
// snake_case. This is what ProtoBuf.js (via gRPC) expects. Similarly, the
// response is in snake_case, which is why we use camelize to return it to
// camelCase. An option will be added to gRPC to allow us to skip this step:
// https://github.com/GoogleCloudPlatform/gcloud-node/pull/1070#discussion_r51285492
service[protoOpts.method](snakeize(reqOpts), function(err, resp) {
if (err) {
if (HTTP_ERROR_CODE_MAP[err.code]) {
Expand All @@ -210,8 +223,9 @@ GrpcService.prototype.request = function(protoOpts, reqOpts, callback) {
for (var prop in data) {
var value = data[prop];

// @todo - Set preference on gRPC deserializeCls method to not give
// raw buffers.
// An option will be added to gRPC to expose a setting which will
// replace this function (convertBuffers).
// https://github.com/GoogleCloudPlatform/gcloud-node/pull/1070#discussion_r51285492
if (is.object(value) && value.length) {
data[prop] = new Buffer(value).toString('base64');
} else if (is.object(value)) {
Expand All @@ -224,7 +238,7 @@ GrpcService.prototype.request = function(protoOpts, reqOpts, callback) {
}

callback(null, convertBuffers(camelize(resp)));
});
}, null, grpcOpts);
};

module.exports = GrpcService;
18 changes: 11 additions & 7 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ PubSub.prototype.getTopics = function(query, callback) {
method: 'listTopics'
};

var reqOpts = extend(query, {
var reqOpts = extend({
project: 'projects/' + this.projectId
});
}, query);

this.request(protoOpts, reqOpts, function(err, result) {
if (err) {
Expand Down Expand Up @@ -416,6 +416,9 @@ PubSub.prototype.getTopics = function(query, callback) {
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request
* @param {module:pubsub/subscription} callback.subscription - The subscription.
Expand Down Expand Up @@ -451,8 +454,6 @@ PubSub.prototype.getTopics = function(query, callback) {
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
*/
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
var self = this;

if (!is.string(topic) && !(topic instanceof Topic)) {
throw new Error('A Topic is required for a new subscription.');
}
Expand All @@ -470,29 +471,32 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
topic = this.topic(topic);
}

var subscription = this.subscription(subName, options);

var protoOpts = {
service: 'Subscriber',
method: 'createSubscription'
method: 'createSubscription',
timeout: options.timeout
};

var reqOpts = extend(true, {}, options, {
topic: topic.name,
name: this.subscription(subName).name
name: subscription.name
});

delete reqOpts.autoAck;
delete reqOpts.encoding;
delete reqOpts.interval;
delete reqOpts.maxInProgress;
delete reqOpts.reuseExisting;
delete reqOpts.timeout;

this.request(protoOpts, reqOpts, function(err, resp) {
if (err && !(err.code === 409 && options.reuseExisting)) {
callback(err, null, resp);
return;
}

var subscription = self.subscription(subName, options);
callback(null, subscription, resp);
});
};
Expand Down
27 changes: 24 additions & 3 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ var util = require('../common/util.js');
* @param {string} options.name - Name of the subscription.
* @param {number} options.maxInProgress - Maximum messages to consume
* simultaneously.
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken. (default: 90000)
*/
/**
* A Subscription object will give you access to your Google Cloud Pub/Sub
Expand Down Expand Up @@ -250,6 +253,16 @@ function Subscription(pubsub, options) {
this.messageListeners = 0;
this.paused = false;

if (is.number(options.timeout)) {
this.timeout = options.timeout;
} else {
// The default timeout used in gcloud-node is 60s, but a pull request times
// out around 90 seconds. Allow an extra couple of seconds to give the API a
// chance to respond on its own before terminating the connection.
var PUBSUB_API_TIMEOUT = 90000;
this.timeout = PUBSUB_API_TIMEOUT + 2000;
}

/**
* [IAM (Identity and Access Management)](https://cloud.google.com/pubsub/access_control)
* allows you to set permissions on invidual resources and offers a wider
Expand Down Expand Up @@ -511,7 +524,8 @@ Subscription.prototype.pull = function(options, callback) {

var protoOpts = {
service: 'Subscriber',
method: 'pull'
method: 'pull',
timeout: this.timeout
};

var reqOpts = {
Expand All @@ -522,8 +536,15 @@ Subscription.prototype.pull = function(options, callback) {

this.request(protoOpts, reqOpts, function(err, response) {
if (err) {
callback(err, null, response);
return;
if (err.code === 504) {
// Simulate a server timeout where no messages were received.
response = {
receivedMessages: []
};
} else {
callback(err, null, response);
return;
}
}

var messages = arrify(response.receivedMessages)
Expand Down
Loading

0 comments on commit 9cf1626

Please sign in to comment.