diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index c7fa586a5..0a05499ef 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -721,27 +721,53 @@ def merge_sql_types( if len(sql_types) == 1: return sql_types[0] + # Gathering Type to match variables + # sent in _adapt_column_type + current_type = sql_types[0] + # sql_type = sql_types[1] + + # Getting the length of each type + # current_type_len: int = getattr(sql_types[0], "length", 0) + sql_type_len: int = getattr(sql_types[1], "length", 0) + if sql_type_len is None: + sql_type_len = 0 + + # Convert the two types given into a sorted list + # containing the best conversion classes sql_types = self._sort_types(sql_types) + # If greater than two evaluate the first pair then on down the line if len(sql_types) > 2: return self.merge_sql_types( [self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:] ) assert len(sql_types) == 2 - generic_type = type(sql_types[0].as_generic()) - if isinstance(generic_type, type): - if issubclass( - generic_type, - (sqlalchemy.types.String, sqlalchemy.types.Unicode), - ): - return sql_types[0] - - elif isinstance( - generic_type, - (sqlalchemy.types.String, sqlalchemy.types.Unicode), - ): - return sql_types[0] + # Get the generic type class + for opt in sql_types: + # Get the length + opt_len: int = getattr(opt, "length", 0) + generic_type = type(opt.as_generic()) + + if isinstance(generic_type, type): + if issubclass( + generic_type, + (sqlalchemy.types.String, sqlalchemy.types.Unicode), + ): + # If length None or 0 then is varchar max ? + if (opt_len is None) or (opt_len == 0): + return opt + elif isinstance( + generic_type, + (sqlalchemy.types.String, sqlalchemy.types.Unicode), + ): + # If length None or 0 then is varchar max ? + if (opt_len is None) or (opt_len == 0): + return opt + # If best conversion class is equal to current type + # return the best conversion class + elif str(opt) == str(current_type): + return opt raise ValueError( f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}" @@ -827,9 +853,21 @@ def _adapt_column_type( Raises: NotImplementedError: if altering columns is not supported. """ - current_type = self._get_column_type(full_table_name, column_name) + current_type: sqlalchemy.types.TypeEngine = self._get_column_type( + full_table_name, column_name + ) + + # Check if the existing column type and the sql type are the same + if str(sql_type) == str(current_type): + # The current column and sql type are the same + # Nothing to do + return + + # Not the same type, generic type or compatible types + # calling merge_sql_types for assistnace compatible_sql_type = self.merge_sql_types([current_type, sql_type]) - if current_type == compatible_sql_type: + + if str(compatible_sql_type) == str(current_type): # Nothing to do return