diff --git a/src/postgres/src/backend/access/transam/xact.c b/src/postgres/src/backend/access/transam/xact.c index f52bc4fef24f..15877f46aa31 100644 --- a/src/postgres/src/backend/access/transam/xact.c +++ b/src/postgres/src/backend/access/transam/xact.c @@ -3053,6 +3053,45 @@ void CommitTransactionCommand(void) { TransactionState s = CurrentTransactionState; + /* Update the session parameter to the shared memory */ + switch (s->blockState) + { + case TBLOCK_END: /* COMMIT received */ + case TBLOCK_STARTED: /* running single-query transaction */ + { + /* every query will be treated as a single transaction */ + /* Copy the session parameter from the local memory to the shared memory */ + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating the shared memory"))); + + YbUpdateSharedMemory(); + YbCleanChangedSessionParameter(); + } + break; + case TBLOCK_BEGIN: + { + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Reset the changed session parameters"))); + YbCleanChangedSessionParameter(); + break; + } + case TBLOCK_SUBBEGIN: /* starting a subtransaction */ + case TBLOCK_SUBINPROGRESS: /* live subtransaction */ + case TBLOCK_SUBRELEASE: /* RELEASE received */ + case TBLOCK_SUBCOMMIT: /* COMMIT received while TBLOCK_SUBINPROGRESS */ + case TBLOCK_SUBABORT: /* failed subxact, awaiting ROLLBACK */ + case TBLOCK_SUBABORT_END: /* failed subxact, ROLLBACK received */ + case TBLOCK_SUBABORT_PENDING: /* live subxact, ROLLBACK received */ + case TBLOCK_SUBRESTART: /* live subxact, ROLLBACK TO received */ + case TBLOCK_SUBABORT_RESTART: /* failed subxact, ROLLBACK TO received */ + /* do nothing for sub transaction since */ + break; + default: + /* do nothing */ + break; + } switch (s->blockState) { @@ -6214,4 +6253,4 @@ void YbClearCurrentTransactionId() { CurrentTransactionState->transactionId = InvalidTransactionId; MyPgXact->xid = InvalidTransactionId; -} +} \ No newline at end of file diff --git a/src/postgres/src/backend/postmaster/postmaster.c b/src/postgres/src/backend/postmaster/postmaster.c index f25fb33c3d1c..7d049f113c16 100644 --- a/src/postgres/src/backend/postmaster/postmaster.c +++ b/src/postgres/src/backend/postmaster/postmaster.c @@ -73,6 +73,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,8 @@ #include "utils/dynamic_loader.h" #include "utils/memutils.h" #include "utils/pidfile.h" +#include "utils/guc.h" +#include "utils/guc_tables.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/varlena.h" @@ -2613,7 +2616,6 @@ ClosePostmasterPorts(bool am_syslogger) #endif } - /* * reset_shared -- reset shared memory and semaphores */ diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index d0501f53d0d0..846d1bc1c8d2 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -82,6 +82,7 @@ #include "utils/builtins.h" #include "utils/bytea.h" #include "utils/guc_tables.h" +#include "utils/shared_memory.h" #include "utils/float.h" #include "utils/memutils.h" #include "utils/pg_locale.h" @@ -93,7 +94,7 @@ #include "utils/tzparser.h" #include "utils/varlena.h" #include "utils/xml.h" - +#include #include "pg_yb_utils.h" #ifndef PG_KRB_SRVTAB @@ -526,6 +527,8 @@ int tcp_keepalives_idle; int tcp_keepalives_interval; int tcp_keepalives_count; +char yb_session_client_id[20]; +struct list_changed_parameters *yb_changed_session_parameter = NULL; /* * SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it * being set to zero (meaning never renegotiate) for backward compatibility. @@ -6230,6 +6233,34 @@ BeginReportingGUCOptions(void) } } +/* + * YbUpdateSessionParameter + * Updates the changes regarding the session parameter + */ +void YbUpdateSessionParameter(const char* parameter_name){ + if(strcmp(yb_session_client_id,"")==0) + return ; /* No client id */ + + /* Update the local parameter */ + + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating the value %s",parameter_name))); + + struct list_changed_parameters *temp; + /* Check whether the session parameter has alredy changed in the on going transaction */ + + for(temp = yb_changed_session_parameter ; temp!=NULL; temp = temp->next) + if(strcmp(temp->parameter_name,parameter_name)==0) + return ; /* the parameter name already exist in the list of changed session parameter */ + + + temp = (struct list_changed_parameters *) malloc(sizeof(struct list_changed_parameters)); + strcpy(temp->parameter_name,parameter_name); + temp->next = yb_changed_session_parameter; + yb_changed_session_parameter = temp; +} + /* * ReportGUCOption: if appropriate, transmit option value to frontend */ @@ -7636,7 +7667,7 @@ set_config_option(const char *name, const char *value, #undef newval } } - + if (changeVal && (record->flags & GUC_REPORT)) ReportGUCOption(record); @@ -8324,6 +8355,83 @@ AlterSystemSetConfigFile(AlterSystemStmt *altersysstmt) LWLockRelease(AutoFileLock); } +/* + * YbHandleSetClientId - Handles SET client_id + * 1. Assign the yb_session_client_id + * 2. Reset all parameters + * + */ +void YbHandleSetClientId(const char * client_id) +{ + int yb_client_id = atoi(client_id); + + ResetAllOptions(); + + /* if the client_id is -1 */ + if(yb_client_id<0) + { + int new_client_id = yb_shmem_get(); + if(new_client_id > 0 ) + ereport(WARNING,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("shmkey=%d",new_client_id))); + }else{ + strcpy(yb_session_client_id,client_id); + /* Get the Key */ + int shared_memory_key; + int array_len; + if( (shared_memory_key = shmget((key_t)yb_client_id, YB_SHMEM_ARR_LEN, 0666)) < 0 ){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmget"))); + + return ; + } + + char *shared_memory_pointor ; + + if( (shared_memory_pointor = shmat(shared_memory_key, NULL, 0)) == (char *) -1){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmat"))); + + return ; + } + /* process the header */ + /* client_id */ + Assert(strcmp(client_id,shared_memory_pointor)==0); /* Both values must match */ + shared_memory_pointor+=20; + + /* array_len */ + array_len = atoi(shared_memory_pointor); /* max length of the array */ + shared_memory_pointor+=20; + + /* database */ + ereport(WARNING,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("database =%s",shared_memory_pointor))); + shared_memory_pointor+=20; + + /* role */ + ereport(WARNING,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("role =%s",shared_memory_pointor))); + shared_memory_pointor+=20; + + int i; + for(i=0; iname,"client_id")==0) + { + YbHandleSetClientId(ExtractSetVariableArgs(stmt)); /* Handle the client_id */ + return; + } switch (stmt->kind) { @@ -8362,13 +8479,21 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel) case VAR_SET_CURRENT: if (stmt->is_local) WarnNoTransactionBlock(isTopLevel, "SET LOCAL"); - (void) set_config_option(stmt->name, + + if(set_config_option(stmt->name, ExtractSetVariableArgs(stmt), (superuser() || YbDbAdminCanSet ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - action, true, 0, false); + action, true, 0, false) == 1) + { + YbUpdateSessionParameter(stmt->name); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating ...."))); + } check_reserved_prefixes(stmt->name); + break; case VAR_SET_MULTI: @@ -8497,12 +8622,18 @@ SetPGVariable(const char *name, List *args, bool is_local) char *argstring = flatten_set_variable_args(name, args); /* Note SET DEFAULT (argstring == NULL) is equivalent to RESET */ - (void) set_config_option(name, + if(set_config_option(name, argstring, (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET, - true, 0, false); + true, 0, false)==1 && is_local == 0) + { + YbUpdateSessionParameter(name); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating ...."))); + } } /* @@ -8540,12 +8671,16 @@ set_config_by_name(PG_FUNCTION_ARGS) is_local = PG_GETARG_BOOL(2); /* Note SET DEFAULT (argstring == NULL) is equivalent to RESET */ - (void) set_config_option(name, + if(set_config_option(name, value, (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET, - true, 0, false); + true, 0, false)==1){ + YbUpdateSessionParameter(name); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating ....")));} /* get the new current value */ new_value = GetConfigOptionByName(name, NULL, false); @@ -8554,7 +8689,6 @@ set_config_by_name(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(new_value)); } - /* * Common code for DefineCustomXXXVariable subroutines: allocate the * new variable's config struct and fill in generic fields. @@ -10924,6 +11058,114 @@ GUCArrayReset(ArrayType *array) return newarray; } +/* + * YbUpdateSharedMemory + * Copy the changed session parameter value to the shared memory + * 1. Check for `yb_session_client_id` if not present then exit + * 2. Copy the context from local memory to shared memory + * 3. Delete the local context. + * 4. Reset `yb_session_client_id` + * + * NOTE: This function will only be called on `COMMIT` or `SUBCOMMIT`. + */ +void YbUpdateSharedMemory(){ + if(strcmp(yb_session_client_id,"")==0) + { + /* yb_changed_session_parameter can only be present if yb_session_client_id is set */ + Assert(yb_changed_session_parameter==NULL); + return; + } + int client_id = atoi(yb_session_client_id); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Processing ..."))); + /* Get the Key */ + + int shared_memory_key; + if( (shared_memory_key = shmget((key_t)client_id, YB_SHAMEM_SIZE, 0666)) < 0 ){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmget"))); + + return ; + } + + char *list_shared_parameter; + + if( (list_shared_parameter = shmat(shared_memory_key, NULL, 0)) == (char *) -1){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmat"))); + + return ; + } + int i; + bool found =0 ; + /* debug::-- working fine till here */ + Assert(strcmp(yb_session_client_id,list_shared_parameter)==0); + list_shared_parameter += 20; + + const int array_size = atoi(list_shared_parameter); + list_shared_parameter += 60; /* ignore database and role */ + char *reset_val = list_shared_parameter; + for(struct list_changed_parameters *temp = yb_changed_session_parameter; temp!=NULL; temp=temp->next) + { + list_shared_parameter = reset_val; + char *parameter_name = temp->parameter_name; + + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Processing %s ", parameter_name))); + + for(i=0; inext; + free(yb_changed_session_parameter); + yb_changed_session_parameter = temp; + } +} + /* * Validate a proposed option setting for GUCArrayAdd/Delete/Reset. * @@ -11949,4 +12191,113 @@ check_backoff_multiplier(double *multiplier, void **extra, GucSource source) return true; } +/* + * yb_shmem_insert + * + * compares the size of the shared memory available and required. + * resize the memory + * compares and insert session parameters and values in the shared_memory + * + * Returns --> + * 0 --> Successfull update + * -1 --> Update failed due to memory issue at shmget + * -2 --> Update failed due to other issues at shmget + * -3 --> Reading failed at shmat + * + */ +int yb_shmem_insert(char **list_parameters, const unsigned int ipc_key ){ + int array_length; + struct yb_shared_parameter *list_shared_parameter; + /* get the list len */ + int shared_memory_key; + if( (shared_memory_key = shmget((key_t)ipc_key, sizeof(int), 0666)) < 0 ){ + perror("Failed to get the memory size: Unable to use shmget:"); + if(errno == YB_SHMEM_ALLOC_ERR_CODE) + return -1; + else + return -2; + + } + int *array_length_ptr; + if( (array_length_ptr = shmat(shared_memory_key, NULL, 0)) == NULL ){ + perror("Failed to READ memory size:"); + return -3 ; + } + array_length = *array_length_ptr; + shmdt(array_length_ptr); + + /* get the list */ + void *shared_mem_pointer; + if( (shared_memory_key = shmget((key_t)ipc_key, sizeof(int)+sizeof(struct yb_shared_parameter)*array_length, 0666)) < 0 ){ + perror("Failed to get the memory size: Unable to use shmget:"); + if(errno == YB_SHMEM_ALLOC_ERR_CODE) + return -1; + else + return -2; + + } + if( (shared_mem_pointer = shmat(shared_memory_key, NULL, 0)) == NULL){ + perror("Failed to READ memory size:"); + return -3; + } + shared_mem_pointer += sizeof(int); /* skip the mem size */ + list_shared_parameter = (struct yb_shared_parameter*) shared_mem_pointer; + if(list_shared_parameter == NULL) + return 0; + /* + * compare the names and consider the upgradation of the parameter (can be optimised ) + */ + /* + * update values that are already present + */ + /* + * resize the shared memory if needed + */ + /* + * add new space + */ + return 1; +} + +int yb_shmem_get() +{ + + /* + * structure of the array size + * | array_len | parameter_name_1 | parameter_value_1 | parameter_name_2 | ..... + * + */ + int shmem_key = KEY_MIN_SHAMEM; + int ipc_key; + char *shared_mem_pointer; + //struct yb_shared_parameter *list_shared_parameter; + /* get the shmem_key */ + while( (ipc_key = shmget((key_t)shmem_key,YB_SHAMEM_SIZE, 0666|IPC_EXCL|IPC_CREAT))<0 ) + { + shmem_key++; + /* it will fail if the key exist */ + } + /* we got a unique key */ + + + /* get the memory pointer*/ + if( (shared_mem_pointer = shmat(ipc_key, NULL, 0)) == NULL){ + perror("Failed to READ memory size:"); + return -3; + } + + /* Header file */ + sprintf(shared_mem_pointer,"%d",shmem_key); + shared_mem_pointer += 20 ; + sprintf(shared_mem_pointer,"%d",YB_SHMEM_ARR_LEN); + shared_mem_pointer += 20 ; + strcpy(shared_mem_pointer,"yugabyte"); /* database */ + shared_mem_pointer += 20 ; + strcpy(shared_mem_pointer,"yugabyte_role"); /* user */ + shared_mem_pointer += 20 ; + + return shmem_key; +} + + #include "guc-file.c" diff --git a/src/postgres/src/include/utils/guc.h b/src/postgres/src/include/utils/guc.h index 466745a9d216..645f6f5f6bb6 100644 --- a/src/postgres/src/include/utils/guc.h +++ b/src/postgres/src/include/utils/guc.h @@ -405,6 +405,8 @@ extern ArrayType *GUCArrayAdd(ArrayType *array, const char *name, const char *va extern ArrayType *GUCArrayDelete(ArrayType *array, const char *name); extern ArrayType *GUCArrayReset(ArrayType *array); +extern void YbUpdateSharedMemory(); +extern void YbCleanChangedSessionParameter(); #ifdef EXEC_BACKEND extern void write_nondefault_variables(GucContext context); extern void read_nondefault_variables(void); @@ -455,4 +457,21 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +#define YB_SHMEM_ARR_LEN 10 + +#define YB_SHAMEM_SIZE (40* YB_SHMEM_ARR_LEN +8) /* + * Strarting memory size (in bytes) of shared memory + * for storing the `session_parameter` of a given client + */ + +#define KEY_MIN_SHAMEM 2346 /* Smallest key value that will be used for shared memory */ +#define MAX_PARAM_NAME_LEN 30 /* MAX Length of the parameter name */ +#define MAX_PARAM_VALUE_LEN 20 /* MAX Length of the parameter size */ + +#define YB_SHMEM_ALLOC_ERR_CODE 22 +extern int yb_shmem_create(); /* creates a shared memory block for a client id */ +/* wrapper functions */ +extern int yb_shmem_remove(); /* remove the shared memory block for a client id */ +extern int yb_shmem_get(); /* remove the shared memory block for a client id */ +extern int yb_shmem_insert(char **list_parameters, const unsigned int icp_id ); /* insert a given */ #endif /* GUC_H */ diff --git a/src/postgres/src/include/utils/guc_tables.h b/src/postgres/src/include/utils/guc_tables.h index c2c922985aae..aa93e2d5ca1a 100644 --- a/src/postgres/src/include/utils/guc_tables.h +++ b/src/postgres/src/include/utils/guc_tables.h @@ -13,7 +13,7 @@ */ #ifndef GUC_TABLES_H #define GUC_TABLES_H 1 - +#define SHAMEM_SIZE 7 #include "utils/guc.h" /* @@ -268,6 +268,18 @@ struct config_enum void *reset_extra; }; +struct yb_shared_parameter +{ + char yb_proxy_client_id[6]; /* client_id generated by the yb_proxy */ + char parameter_value[20]; /* value of the parameter */ + char parameter_name[20]; /* value of the parameter */ +}; + +struct list_changed_parameters +{ + char parameter_name[20]; /* name of the session parameter changed */ + struct list_changed_parameters *next; /* next element in the list */ +}; /* constant tables corresponding to enums above and in guc.h */ extern const char *const config_group_names[]; extern const char *const config_type_names[]; diff --git a/src/postgres/src/interfaces/libpq/fe-connect.c b/src/postgres/src/interfaces/libpq/fe-connect.c index c6053e4f9464..b43e0248725a 100644 --- a/src/postgres/src/interfaces/libpq/fe-connect.c +++ b/src/postgres/src/interfaces/libpq/fe-connect.c @@ -184,6 +184,15 @@ static const internalPQconninfoOption PQconninfoOptions[] = { * the array so as not to reject conninfo strings from old apps that might * still try to set it. */ + + {"load_balance" , "YBLOAD_BALANCE", NULL, NULL, + "Load-Balancing" , "D" , 20 , + offsetof(struct pg_conn, load_balance) }, + + {"topology_keys" , "YBTOPOLOGY_KEYS", NULL, NULL, + "topology_keys" , "D" , 20 , + offsetof(struct pg_conn, topology_keys) }, + {"authtype", "PGAUTHTYPE", DefaultAuthtype, NULL, "Database-Authtype", "D", 20, -1}, @@ -552,7 +561,7 @@ pqDropServerData(PGconn *conn) * * If it is desired to connect in a synchronous (blocking) manner, use the * function PQconnectdb or PQconnectdbParams. The former accepts a string of - * option = value pairs (or a URI) which must be parsed; the latter takes two + * option = value pairs (or a URI) which must be parsed;the latter takes two * NULL terminated arrays instead. * * To connect in an asynchronous (non-blocking) manner, use the functions @@ -618,6 +627,59 @@ PQpingParams(const char *const *keywords, return ret; } + +/* + * node_details will store the details regarding the connection count for any server + */ +struct node_details { + char *host_ip; + int connections; + char *topology; + bool is_running; +} *server_details = NULL; + +/* + * YBclientNetworkStatus + * 1 if the client is identified inside the YB network + * -1 if the client is identified outside the YB network + * 0 network status is not yet found + */ +int YBclientNetworkStatus = 0; + +/* + * Before iterating over the map the map_ready mutex must be checked + */ +static pthread_mutex_t map_ready = PTHREAD_MUTEX_INITIALIZER; +int total_servers =0; +time_t yb_last_update_time = 0; + +/* + * YBupdateMap + * + * YBupdateMap will be used to increase or decrease the count of connection for any server + * Input: The ip_address of the server + * change: +1 to increment / -1 to decrement the connection count. + * check_thread_safe: Boolean value, if true then lock the map_ready thread lock + * before iterating. + */ +void YBupdateMap(const char *ip_address , int change , bool check_thread_safe) +{ + /* In case if the ip_address is NULL */ + if(!ip_address) + return; + + /* Add a thread lock for the map */ + if(check_thread_safe) + pthread_mutex_lock(&(map_ready)); + + for(int i =0;i < total_servers;i++) + if(strcmp(server_details[i].host_ip , ip_address) == 0) + server_details[i].connections += change; /* Update the connection count */ + + if(check_thread_safe) + pthread_mutex_unlock(&(map_ready)); +} + /* * PQconnectdb * @@ -731,21 +793,627 @@ PQconnectStartParams(const char *const *keywords, PQconninfoFree(connOptions); /* - * Compute derived options + * Check for the load_balance */ - if (!connectOptions2(conn)) - return conn; + if(conn->load_balance != NULL && strcmp(conn->load_balance , "true") == 0) + { + /* + * Make the smart connection with the loadbalance feature + */ + if(!YBconnectLoadBalance(conn)) + conn->status = CONNECTION_BAD; + + }else + { + /* + * Compute derived options + */ + if (!connectOptions2(conn)) + return conn; + + if (!connectDBStart( conn)) + { + /* + * Just in case we failed to set it in connectDBStart + */ + conn->status = CONNECTION_BAD; + } + } + + return conn; +} +/* + * YBtestNetwork + * Checks whether the Client is inside the network of the YB cluster for a given control connection and IP address + */ +bool YBtestNetwork(const PGconn *control_connection,char* private_ip) +{ + if(!(private_ip) || private_ip==NULL) + return 0; + + PGconn *new_conn = makeEmptyPGconn(); + *new_conn = *control_connection; + + new_conn->pghostaddr=NULL; + free(new_conn->pghost); + new_conn->pghost = (char*)malloc(sizeof(private_ip)+1); + strcpy(new_conn->pghost,private_ip); + + + new_conn->load_balance="false"; + + if (!connectOptions2(new_conn) || + !connectDBComplete(new_conn)|| + !connectDBStart( new_conn)) + { + PQfinish(new_conn); + return 0; + }else + { + PQfinish(new_conn); + return 1; + } +} + + +/* + * YBupdateClusterinfo + * + * YBupdateClusterinfo populates the data regarding the server into the + * server_details map/list. + * If the last update happened before 5 minutes the update will be skipped. + * Use contro_connection to execute the query : "SELECT host , port , num_connections , node_type , cloud , region , zone , public_ip from yb_servers();" + * Once the query's results has been received the yb_last_update_time is modified. + * If any server that is present in server_details but is absent in the result + * of the above query it is considered to be down. All other servers present in + * the result are considered to be running. + * It returns 1 for every successful update and 0 for any failure. + */ +bool YBupdateClusterinfo(PGconn *conn) +{ /* - * Connect to the database + * Check for the last update time + */ + const int refresh_time = 5*60; /* 5 mins */ + time_t temp_time = time(NULL); + + if((yb_last_update_time!=0)&&(temp_time-yb_last_update_timestatus = CONNECTION_BAD; + return 0; + } + + int nServers = PQntuples(res); /* Total number of servers found in the query's result */ + int increase_map_size = 0; /* For keeping the count of servers to be added. */ + bool server_to_add[nServers]; /* For keeping the mark of servers to be added. */ + + /* + * If the Network status is not yet decided. + * Identify if the Client is inside the Private network. */ - if (!connectDBStart(conn)) + if (YBclientNetworkStatus==0) { - /* Just in case we failed to set it in connectDBStart */ + int itr; + for(itr=0;itrpghost && strcmp(conn->pghost,PQgetvalue(res, itr , 0))==0 ) || (conn->pghostaddr && strcmp(conn->pghostaddr,PQgetvalue(res, itr , 0))==0) ) + { + YBclientNetworkStatus=1; + break; + } + } + } + + /* + * If the Network status is not yet decided. + * Identify if the Client is outside the Private network. + */ + if (YBclientNetworkStatus==0) + { + int itr; + for(itr=0;itrpghost && strcmp(conn->pghost,PQgetvalue(res, itr , 7))==0) || (conn->pghostaddr && strcmp(conn->pghostaddr,PQgetvalue(res, itr , 7)))==0) + { + YBclientNetworkStatus=-1; + break; + } + } + } + + /* + * If the Network status is not yet decided. + * Try connecting with all the Private IP address. + */ + if (YBclientNetworkStatus==0) + { + int itr; + for(itr=0;itr server_details[i].connections) + { + *next_host_ip = server_details[i].host_ip; + lowest_value = server_details[i].connections; + } + } + } + + if(*next_host_ip != NULL) + { + /* + * Since no other thread is iterating the server_details the + * YBupdateMap can be called with check_thread_safe as false. + */ + YBupdateMap( *next_host_ip,1,false); + + /* + * Unlock the map_ready thread lock. + */ + pthread_mutex_unlock(&(map_ready)); + return 1; + }else + { + /* + * Unlock the map_ready thread lock. + */ + pthread_mutex_unlock(&(map_ready)); + return 0; + } +} + +/* + * YBserverStatusChange + * + * YBserverStatusChange() changes the is_running status of the server + * returns true if the chage is successful else false + */ +bool YBserverStatusChange(char *server_address , bool new_status , bool check_thread_safe) +{ + + /* + * Check if the map is ready to iterate + */ + if(check_thread_safe) + pthread_mutex_lock(&(map_ready)); + + for(int i = 0;i < total_servers;i++) + { + if(strcmp(server_details[i].host_ip,server_address)==0) + { + server_details[i].is_running = new_status; + if(check_thread_safe) + pthread_mutex_unlock(&(map_ready)); + return true; + } + } + + if(check_thread_safe) + pthread_mutex_unlock(&(map_ready)); + + return false; +} + +/* + * control_connection is the backend connection that will + * be used to update the information about the servers in the cluster. + */ +PGconn * control_connection = NULL; + +/* + * thread_lock mutex for YBcheckControlConnection function + */ +static pthread_mutex_t sync_control_connection = PTHREAD_MUTEX_INITIALIZER; + +/* + * YBcheckControlConnection + * + * YBcheckControlConnection is used to establish the control connection and + * update the server_details. + * 1. Establish the control connection + * 2. Initialize the map + * 3. Update the clusters in the map + */ +bool YBcheckControlConnection(PGconn *conn) +{ + /* + * Thread lock + */ + pthread_mutex_lock(&sync_control_connection); + + start_control_connection : + + /* + * Check if the control_connection has already been established or not. + */ + if(control_connection == NULL) + { + /* + * Allocate the memory for the control_connection. + */ + if(!control_connection) + control_connection = makeEmptyPGconn(); + + /* + * Unable to allocate the memory + */ + if (control_connection == NULL) + { + /* + * Thread unlock + */ + pthread_mutex_unlock(&sync_control_connection); + return 0; + } + + /* + * Copy the connection info + */ + *control_connection = *conn; + + /* + * Modify the load_balance feature to false + */ + control_connection->load_balance = "false"; + control_connection->topology_keys = NULL; + + /* + * try_next_server is the index of the server + * we are trying to connect in the list server_details. + * Its value is -1 for the ip address provided by the user. + */ + int try_next_server = -1 ; + + /* + * Try connecting with the server + */ + next_server_for_control_connection: + /* + * Compute derived options + */ + if (!connectOptions2(control_connection) || + !connectDBStart( control_connection) || + !connectDBComplete(control_connection) || + !YBupdateClusterinfo(control_connection)) + { + /* + * Try connecting with next server available in the cluster + */ + try_next_server++; + if(try_next_server < total_servers) + { + /* + * Try connecting to the next server + */ + control_connection->pghost = server_details[try_next_server].host_ip; + goto next_server_for_control_connection; + }else + { + /* + * We are unable to establish any control_connection + */ + control_connection = NULL; + + /* + * Thread unlock + */ + pthread_mutex_unlock(&sync_control_connection); + return 0; + } + } + } + + if(!YBupdateClusterinfo(control_connection)) + { + /* + * Unable to connect/retrieve data + */ + control_connection = NULL; + goto start_control_connection; + } + + /* + * Thread unlock + */ + pthread_mutex_unlock(&sync_control_connection); + return 1; +} + +/* + * YBconnectLoadBalance + * + * YBconnectLoadBalance function will be used to make any LoadBalanced connection + * Input - PGconn connection object + * 1. Check that the control connection is established + * 2. Consider the host with lowest number of connections and try to connect with it. + * 3. If the connection fails goto step 2 and repeat until all the available hosts are checked. + * 4. Once a connection is established return true else false if unable to establish connection with any host. + */ +bool YBconnectLoadBalance(PGconn *conn) +{ + /* + * Check the control connection + */ + if(!YBcheckControlConnection(conn)) + { + conn->pghost = NULL; + return 0; + } + + conn->pghost = NULL; + conn->pghostaddr = NULL; + char *next_least_connection ; + + next_server_for_connection: + + /* + * Allocate the host with least number of connection + */ + if(YBnextHost(conn->topology_keys , &next_least_connection)) + { + if(conn->pghost) + free(conn->pghost); + conn->pghost = (char *) malloc((strlen(next_least_connection)+1) * sizeof(char)); + strcpy(conn->pghost , next_least_connection); + } + else + { + /* + * If next_host returns false then the map was not updated + */ conn->status = CONNECTION_BAD; + return 0; } - return conn; + /* + * Compute derived options + */ + if (!connectOptions2(conn) || !connectDBStart( conn)) + { + /* + * Update the server's is_running status to false + */ + YBserverStatusChange(next_least_connection,false,true); + + /* + * Since the connection count was optimistically incremented, decrement the count. + */ + YBupdateMap(conn->pghost,-1,true) ; + + /* + * Try connecting with the next host + */ + goto next_server_for_connection; + } + return 1 ; } /* @@ -784,20 +1452,27 @@ PQconnectStart(const char *conninfo) */ if (!connectOptions1(conn, conninfo)) return conn; - - /* - * Compute derived options - */ - if (!connectOptions2(conn)) - return conn; - + /* - * Connect to the database + * Check for the load_balance */ - if (!connectDBStart(conn)) + if (conn->load_balance != NULL && strcmp(conn->load_balance , "true") == 0) { - /* Just in case we failed to set it in connectDBStart */ - conn->status = CONNECTION_BAD; + /* + * Make the smart connection with the loadbalance feature + */ + if (!YBconnectLoadBalance(conn)) + conn->status = CONNECTION_BAD; + }else + { + /* + * Compute derived options + */ + if (!connectOptions2(conn)) + return conn; + + if (!connectDBStart( conn)) + conn->status = CONNECTION_BAD; /* Just in case we failed to set it in connectDBStart */ } return conn; @@ -816,7 +1491,7 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions) { const internalPQconninfoOption *option; - for (option = PQconninfoOptions; option->keyword; option++) + for (option = PQconninfoOptions;option->keyword;option++) { if (option->connofs >= 0) { @@ -896,7 +1571,7 @@ count_comma_separated_elems(const char *input) int n; n = 1; - for (; *input != '\0'; input++) + for (;*input != '\0';input++) { if (*input == ',') n++; @@ -923,7 +1598,7 @@ parse_comma_separated_list(char **startptr, bool *more) int len; /* - * Search for the end of the current element; a comma or end-of-string + * Search for the end of the current element;a comma or end-of-string * acts as a terminator. */ e = s; @@ -982,7 +1657,7 @@ connectOptions2(PGconn *conn) char *s = conn->pghostaddr; bool more = true; - for (i = 0; i < conn->nconnhost && more; i++) + for (i = 0;i < conn->nconnhost && more;i++) { conn->connhost[i].hostaddr = parse_comma_separated_list(&s, &more); if (conn->connhost[i].hostaddr == NULL) @@ -1003,7 +1678,7 @@ connectOptions2(PGconn *conn) char *s = conn->pghost; bool more = true; - for (i = 0; i < conn->nconnhost && more; i++) + for (i = 0;i < conn->nconnhost && more;i++) { conn->connhost[i].host = parse_comma_separated_list(&s, &more); if (conn->connhost[i].host == NULL) @@ -1025,7 +1700,7 @@ connectOptions2(PGconn *conn) * Now, for each host slot, identify the type of address spec, and fill in * the default address if nothing was given. */ - for (i = 0; i < conn->nconnhost; i++) + for (i = 0;i < conn->nconnhost;i++) { pg_conn_host *ch = &conn->connhost[i]; @@ -1076,7 +1751,7 @@ connectOptions2(PGconn *conn) char *s = conn->pgport; bool more = true; - for (i = 0; i < conn->nconnhost && more; i++) + for (i = 0;i < conn->nconnhost && more;i++) { conn->connhost[i].port = parse_comma_separated_list(&s, &more); if (conn->connhost[i].port == NULL) @@ -1089,7 +1764,7 @@ connectOptions2(PGconn *conn) */ if (i == 1 && !more) { - for (i = 1; i < conn->nconnhost; i++) + for (i = 1;i < conn->nconnhost;i++) { conn->connhost[i].port = strdup(conn->connhost[0].port); if (conn->connhost[i].port == NULL) @@ -1161,7 +1836,7 @@ connectOptions2(PGconn *conn) if (conn->pgpassfile != NULL && conn->pgpassfile[0] != '\0') { - for (i = 0; i < conn->nconnhost; i++) + for (i = 0;i < conn->nconnhost;i++) { /* * Try to get a password for this host from file. We use host @@ -2045,7 +2720,7 @@ PQconnectPoll(PGconn *conn) libpq_gettext( "invalid connection state, " "probably indicative of memory corruption\n" - )); + )); goto error_return; } @@ -2197,7 +2872,7 @@ PQconnectPoll(PGconn *conn) #ifdef USE_SSL /* initialize these values based on SSL mode */ conn->allow_ssl_try = (conn->sslmode[0] != 'd'); /* "disable" */ - conn->wait_ssl_try = (conn->sslmode[0] == 'a'); /* "allow" */ + conn->wait_ssl_try = (conn->sslmode[0] == 'a');/* "allow" */ #endif reset_connection_state_machine = false; @@ -2260,7 +2935,7 @@ PQconnectPoll(PGconn *conn) { /* * Silently ignore socket() failure if we have more - * addresses to try; this reduces useless chatter in + * addresses to try;this reduces useless chatter in * cases where the address list includes both IPv4 and * IPv6 but kernel only accepts one family. */ @@ -2841,7 +3516,7 @@ PQconnectPoll(PGconn *conn) * auth requests may not be that small. Errors can be a * little larger, but not huge. If we see a large apparent * length in an error, it means we're really talking to a - * pre-3.0-protocol server; cope. + * pre-3.0-protocol server;cope. */ if (beresp == 'R' && (msgLength < 8 || msgLength > 2000)) { @@ -2856,13 +3531,13 @@ PQconnectPoll(PGconn *conn) if (beresp == 'E' && (msgLength < 8 || msgLength > 30000)) { /* Handle error from a pre-3.0 server */ - conn->inCursor = conn->inStart + 1; /* reread data */ + conn->inCursor = conn->inStart + 1;/* reread data */ if (pqGets_append(&conn->errorMessage, conn)) { /* We'll come back when there is more data */ return PGRES_POLLING_READING; } - /* OK, we read the message; mark data consumed */ + /* OK, we read the message;mark data consumed */ conn->inStart = conn->inCursor; /* @@ -2897,7 +3572,7 @@ PQconnectPoll(PGconn *conn) { /* * Before returning, try to enlarge the input buffer if - * needed to hold the whole message; see notes in + * needed to hold the whole message;see notes in * pqParseInput3. */ if (pqCheckInBufferSpace(conn->inCursor + (size_t) msgLength, @@ -2926,7 +3601,7 @@ PQconnectPoll(PGconn *conn) return PGRES_POLLING_READING; } } - /* OK, we read the message; mark data consumed */ + /* OK, we read the message;mark data consumed */ conn->inStart = conn->inCursor; /* Check to see if we should mention pgpassfile */ @@ -2995,7 +3670,7 @@ PQconnectPoll(PGconn *conn) { /* * Before returning, try to enlarge the input buffer - * if needed to hold the whole message; see notes in + * if needed to hold the whole message;see notes in * pqParseInput3. */ if (pqCheckInBufferSpace(conn->inCursor + (size_t) 4, @@ -3016,7 +3691,7 @@ PQconnectPoll(PGconn *conn) res = pg_fe_sendauth(areq, msgLength, conn); conn->errorMessage.len = strlen(conn->errorMessage.data); - /* OK, we have processed the message; mark data consumed */ + /* OK, we have processed the message;mark data consumed */ conn->inStart = conn->inCursor; if (res != STATUS_OK) @@ -3085,7 +3760,7 @@ PQconnectPoll(PGconn *conn) * close the connection and retry without sending * application_name. We could possibly get a false * SQLSTATE match here and retry uselessly, but there - * seems no great harm in that; we'll just get the + * seems no great harm in that;we'll just get the * same error again if it's unrelated. */ const char *sqlstate; @@ -3103,7 +3778,7 @@ PQconnectPoll(PGconn *conn) /* * if the resultStatus is FATAL, then conn->errorMessage - * already has a copy of the error; needn't copy it back. + * already has a copy of the error;needn't copy it back. * But add a newline if it's not there already, since * postmaster error messages may not have one. */ @@ -3287,7 +3962,7 @@ PQconnectPoll(PGconn *conn) val = PQgetvalue(res, 0, 0); if (strncmp(val, "on", 2) == 0) { - /* Not writable; fail this connection. */ + /* Not writable;fail this connection. */ const char *displayed_host; const char *displayed_port; @@ -3448,7 +4123,7 @@ internal_ping(PGconn *conn) /* * Any other SQLSTATE can be taken to indicate that the server is up. - * Presumably it didn't like our username, password, or database name; or + * Presumably it didn't like our username, password, or database name;or * perhaps it had some transient failure, but that should not be taken as * meaning "it's down". */ @@ -3544,7 +4219,7 @@ makeEmptyPGconn(void) * - free an idle (closed) PGconn data structure * * NOTE: this should not overlap any functionality with closePGconn(). - * Clearing/resetting of transient state belongs there; what we do here is + * Clearing/resetting of transient state belongs there;what we do here is * release data that is to be held for the life of the PGconn structure. * If a value ought to be cleared/freed during PQreset(), do it there not here. */ @@ -3554,7 +4229,7 @@ freePGconn(PGconn *conn) int i; /* let any event procs clean up their state data */ - for (i = 0; i < conn->nEvents; i++) + for (i = 0;i < conn->nEvents;i++) { PGEventConnDestroy evt; @@ -3567,7 +4242,7 @@ freePGconn(PGconn *conn) /* clean up pg_conn_host structures */ if (conn->connhost != NULL) { - for (i = 0; i < conn->nconnhost; ++i) + for (i = 0;i < conn->nconnhost;++i) { if (conn->connhost[i].host != NULL) free(conn->connhost[i].host); @@ -3652,6 +4327,7 @@ freePGconn(PGconn *conn) free(conn->rowBuf); if (conn->target_session_attrs) free(conn->target_session_attrs); + termPQExpBuffer(&conn->errorMessage); termPQExpBuffer(&conn->workBuffer); @@ -3746,8 +4422,11 @@ closePGconn(PGconn *conn) void PQfinish(PGconn *conn) { - if (conn) - { + if (conn!=NULL) + { + if((conn->load_balance!= NULL)&&(strcmp(conn->load_balance,"true")==0)) + YBupdateMap(conn->pghost,-1,true); + closePGconn(conn); freePGconn(conn); } @@ -3772,7 +4451,7 @@ PQreset(PGconn *conn) */ int i; - for (i = 0; i < conn->nEvents; i++) + for (i = 0;i < conn->nEvents;i++) { PGEventConnReset evt; @@ -3832,7 +4511,7 @@ PQresetPoll(PGconn *conn) */ int i; - for (i = 0; i < conn->nEvents; i++) + for (i = 0;i < conn->nEvents;i++) { PGEventConnReset evt; @@ -4037,7 +4716,7 @@ PQcancel(PGcancel *cancel, char *errbuf, int errbufsize) * * Returns true if able to send the cancel request, false if not. * - * On failure, the error message is saved in conn->errorMessage; this means + * On failure, the error message is saved in conn->errorMessage;this means * that this can't be used when there might be other active operations on * the connection object. * @@ -4080,7 +4759,7 @@ PQrequestCancel(PGconn *conn) * packets, which have no message type code.) * * buf, buf_len: contents of message. The given length includes only what - * is in buf; the message type and message length fields are added here. + * is in buf;the message type and message length fields are added here. * * RETURNS: STATUS_ERROR if the write fails, STATUS_OK otherwise. * SIDE_EFFECTS: may block. @@ -4197,7 +4876,7 @@ ldapServiceLookup(const char *purl, PQconninfoOption *options, /* hostname */ hostname = url + strlen(LDAP_URL); if (*hostname == '/') /* no hostname? */ - hostname = DefaultHost; /* the default */ + hostname = DefaultHost;/* the default */ /* dn, "distinguished name" */ p = strchr(url + strlen(LDAP_URL), '/'); @@ -4423,7 +5102,7 @@ ldapServiceLookup(const char *purl, PQconninfoOption *options, /* concatenate values into a single string with newline terminators */ size = 1; /* for the trailing null */ - for (i = 0; values[i] != NULL; i++) + for (i = 0;values[i] != NULL;i++) size += values[i]->bv_len + 1; if ((result = malloc(size)) == NULL) { @@ -4434,7 +5113,7 @@ ldapServiceLookup(const char *purl, PQconninfoOption *options, return 3; } p = result; - for (i = 0; values[i] != NULL; i++) + for (i = 0;values[i] != NULL;i++) { memcpy(p, values[i]->bv_val, values[i]->bv_len); p += values[i]->bv_len; @@ -4447,7 +5126,7 @@ ldapServiceLookup(const char *purl, PQconninfoOption *options, /* parse result string */ oldstate = state = 0; - for (p = result; *p != '\0'; ++p) + for (p = result;*p != '\0';++p) { switch (state) { @@ -4501,7 +5180,7 @@ ldapServiceLookup(const char *purl, PQconninfoOption *options, } else if (ld_is_nl_cr(*p)) { - optval = optname + strlen(optname); /* empty */ + optval = optname + strlen(optname);/* empty */ state = 0; } else if (!ld_is_sp_tab(*p)) @@ -4537,7 +5216,7 @@ ldapServiceLookup(const char *purl, PQconninfoOption *options, if (state == 0 && oldstate != 0) { found_keyword = false; - for (i = 0; options[i].keyword; i++) + for (i = 0;options[i].keyword;i++) { if (strcmp(options[i].keyword, optname) == 0) { @@ -4788,7 +5467,7 @@ parseServiceFile(const char *serviceFile, * explicit setting. */ found_keyword = false; - for (i = 0; options[i].keyword; i++) + for (i = 0;options[i].keyword;i++) { if (strcmp(options[i].keyword, key) == 0) { @@ -4882,7 +5561,7 @@ conninfo_init(PQExpBuffer errorMessage) } opt_dest = options; - for (cur_opt = PQconninfoOptions; cur_opt->keyword; cur_opt++) + for (cur_opt = PQconninfoOptions;cur_opt->keyword;cur_opt++) { /* Only copy the public part of the struct, not the full internal */ memcpy(opt_dest, cur_opt, sizeof(PQconninfoOption)); @@ -5190,7 +5869,7 @@ conninfo_array_parse(const char *const *keywords, const char *const *values, if (pvalue != NULL && pvalue[0] != '\0') { /* Search for the param record */ - for (option = options; option->keyword != NULL; option++) + for (option = options;option->keyword != NULL;option++) { if (strcmp(option->keyword, pname) == 0) break; @@ -5216,13 +5895,13 @@ conninfo_array_parse(const char *const *keywords, const char *const *values, { PQconninfoOption *str_option; - for (str_option = dbname_options; str_option->keyword != NULL; str_option++) + for (str_option = dbname_options;str_option->keyword != NULL;str_option++) { if (str_option->val != NULL) { int k; - for (k = 0; options[k].keyword; k++) + for (k = 0;options[k].keyword;k++) { if (strcmp(options[k].keyword, str_option->keyword) == 0) { @@ -5293,7 +5972,7 @@ conninfo_array_parse(const char *const *keywords, const char *const *values, * * Defaults are obtained from a service file, environment variables, etc. * - * Returns true if successful, otherwise false; errorMessage, if supplied, + * Returns true if successful, otherwise false;errorMessage, if supplied, * is filled in upon failure. Note that failure to locate a default value * is not an error condition here --- we just leave the option's value as * NULL. @@ -5316,7 +5995,7 @@ conninfo_add_defaults(PQconninfoOption *options, PQExpBuffer errorMessage) * Get the fallback resources for parameters not specified in the conninfo * string nor the service. */ - for (option = options; option->keyword != NULL; option++) + for (option = options;option->keyword != NULL;option++) { if (option->val != NULL) continue; /* Value was in conninfo or service */ @@ -5344,7 +6023,7 @@ conninfo_add_defaults(PQconninfoOption *options, PQExpBuffer errorMessage) * Interpret the deprecated PGREQUIRESSL environment variable. Per * tradition, translate values starting with "1" to sslmode=require, * and ignore other values. Given both PGREQUIRESSL=1 and PGSSLMODE, - * PGSSLMODE takes precedence; the opposite was true before v9.3. + * PGSSLMODE takes precedence;the opposite was true before v9.3. */ if (strcmp(option->keyword, "sslmode") == 0) { @@ -5383,7 +6062,7 @@ conninfo_add_defaults(PQconninfoOption *options, PQExpBuffer errorMessage) /* * Special handling for "user" option. Note that if pg_fe_getauthname - * fails, we just leave the value as NULL; there's no need for this to + * fails, we just leave the value as NULL;there's no need for this to * be an error condition if the caller provides a user name. The only * reason we do this now at all is so that callers of PQconndefaults * will see a correct default (barring error, of course). @@ -5632,7 +6311,7 @@ conninfo_uri_parse_options(PQconninfoOption *options, const char *uri, if (prevchar == ':') { - const char *port = ++p; /* advance past host terminator */ + const char *port = ++p;/* advance past host terminator */ while (*p && *p != '/' && *p != '?' && *p != ',') ++p; @@ -5746,7 +6425,7 @@ conninfo_uri_parse_params(char *params, else if (*p == '&' || *p == '\0') { /* - * If not at the end, cut off value and advance; leave p + * If not at the end, cut off value and advance;leave p * pointing to start of the next parameter, if any. */ if (*p != '\0') @@ -5796,7 +6475,7 @@ conninfo_uri_parse_params(char *params, } /* - * Store the value if the corresponding option exists; ignore + * Store the value if the corresponding option exists;ignore * otherwise. At this point both keyword and value are not * URI-encoded. */ @@ -6027,7 +6706,7 @@ conninfo_find(PQconninfoOption *connOptions, const char *keyword) { PQconninfoOption *option; - for (option = connOptions; option->keyword != NULL; option++) + for (option = connOptions;option->keyword != NULL;option++) { if (strcmp(option->keyword, keyword) == 0) return option; @@ -6060,7 +6739,7 @@ PQconninfo(PGconn *conn) { const internalPQconninfoOption *option; - for (option = PQconninfoOptions; option->keyword; option++) + for (option = PQconninfoOptions;option->keyword;option++) { char **connmember; @@ -6089,7 +6768,7 @@ PQconninfoFree(PQconninfoOption *connOptions) if (connOptions == NULL) return; - for (option = connOptions; option->keyword != NULL; option++) + for (option = connOptions;option->keyword != NULL;option++) { if (option->val != NULL) free(option->val); @@ -6204,7 +6883,7 @@ PQparameterStatus(const PGconn *conn, const char *paramName) if (!conn || !paramName) return NULL; - for (pstatus = conn->pstatus; pstatus != NULL; pstatus = pstatus->next) + for (pstatus = conn->pstatus;pstatus != NULL;pstatus = pstatus->next) { if (strcmp(pstatus->name, paramName) == 0) return pstatus->value; @@ -6428,7 +7107,7 @@ PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg) /* * The default notice message receiver just gets the standard notice text * and sends it to the notice processor. This two-level setup exists - * mostly for backwards compatibility; perhaps we should deprecate use of + * mostly for backwards compatibility;perhaps we should deprecate use of * PQsetNoticeProcessor? */ static void @@ -6544,7 +7223,7 @@ passwordFromFile(const char *hostname, const char *port, const char *dbname, if (stat_buf.st_mode & (S_IRWXG | S_IRWXO)) { fprintf(stderr, - libpq_gettext("WARNING: password file \"%s\" has group or world access; permissions should be u=rw (0600) or less\n"), + libpq_gettext("WARNING: password file \"%s\" has group or world access;permissions should be u=rw (0600) or less\n"), pgpassfile); return NULL; } @@ -6602,7 +7281,7 @@ passwordFromFile(const char *hostname, const char *port, const char *dbname, } /* De-escape password. */ - for (p1 = p2 = ret; *p1 != ':' && *p1 != '\0'; ++p1, ++p2) + for (p1 = p2 = ret;*p1 != ':' && *p1 != '\0';++p1, ++p2) { if (*p1 == '\\' && p1[1] != '\0') ++p1; @@ -6706,7 +7385,7 @@ default_threadlock(int acquire) if (singlethread_lock == NULL) { while (InterlockedExchange(&mutex_initlock, 1) == 1) - /* loop, another thread own the lock */ ; + /* loop, another thread own the lock */; if (singlethread_lock == NULL) { if (pthread_mutex_init(&singlethread_lock, NULL)) diff --git a/src/postgres/src/interfaces/libpq/libpq-int.h b/src/postgres/src/interfaces/libpq/libpq-int.h index c497d0e38021..477e0b8e3ccc 100644 --- a/src/postgres/src/interfaces/libpq/libpq-int.h +++ b/src/postgres/src/interfaces/libpq/libpq-int.h @@ -357,7 +357,9 @@ struct pg_conn char *sslrootcert; /* root certificate filename */ char *sslcrl; /* certificate revocation list filename */ char *requirepeer; /* required peer credentials for local sockets */ - + + char *topology_keys; /* Stores the details about the topology */ + char *load_balance ; /* For enabling the load_balancing feature */ #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) char *krbsrvname; /* Kerberos service name */ #endif @@ -519,6 +521,17 @@ struct pg_cancel */ extern char *const pgresStatus[]; +/* + * yb_connectLoadBalance function will be used to make any LoadBalanced connection + * Input - PGconn connection object + * 1. Check that the control connection is established + * 2. Consider the host with lowest number of connections and try to connect with it. + * 3. If the connection fails goto step 2 and repeat until all the available hosts are checked. + * 4. Once a connection is established return true else false if unable to establish connection with any host. + */ + +bool YBconnectLoadBalance(PGconn *conn ) ; +bool YBserverStatusChange(char *server_address , bool new_status , bool check_map_ready ) ; #ifdef USE_SSL