diff --git a/luigi/contrib/hive.py b/luigi/contrib/hive.py index ac07103329..189d96e987 100644 --- a/luigi/contrib/hive.py +++ b/luigi/contrib/hive.py @@ -469,54 +469,38 @@ def run_job(self, job, tracking_url_callback=None): return luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, job.set_tracking_url) -class HiveTableTarget(luigi.Target): - """ - exists returns true if the table exists. - """ - - def __init__(self, table, database='default', client=None): - self.database = database - self.table = table - self.client = client or get_default_client() - - def exists(self): - logger.debug("Checking if Hive table '%s.%s' exists", self.database, self.table) - return self.client.table_exists(self.table, self.database) - - @property - def path(self): - """ - Returns the path to this table in HDFS. - """ - location = self.client.table_location(self.table, self.database) - if not location: - raise Exception("Couldn't find location for table: {0}".format(str(self))) - return location - - def open(self, mode): - return NotImplementedError("open() is not supported for HiveTableTarget") - - class HivePartitionTarget(luigi.Target): """ - exists returns true if the table's partition exists. + Target representing Hive table or Hive partition """ def __init__(self, table, partition, database='default', fail_missing_table=True, client=None): + """ + @param table: Table name + @type table: str + @param partition: partition specificaton in form of + dict of {"partition_column_1": "partition_value_1", "partition_column_2": "partition_value_2", ... } + If `partition` is `None` or `{}` then target is Hive nonpartitioned table + @param database: Database name + @param fail_missing_table: flag to ignore errors raised due to table nonexistence + @param client: `HiveCommandClient` instance. Default if `client is None` + """ self.database = database self.table = table self.partition = partition self.client = client or get_default_client() - self.fail_missing_table = fail_missing_table def exists(self): + """ + returns `True` if the partition/table exists + """ try: logger.debug( "Checking Hive table '{d}.{t}' for partition {p}".format( d=self.database, t=self.table, - p=str(self.partition) + p=str(self.partition or {}) ) ) @@ -542,15 +526,26 @@ def path(self): raise Exception("Couldn't find location for table: {0}".format(str(self))) return location - def open(self, mode): - return NotImplementedError("open() is not supported for HivePartitionTarget") + +class HiveTableTarget(HivePartitionTarget): + """ + Target representing non-partitioned table + """ + + def __init__(self, table, database='default', client=None): + super(HiveTableTarget, self).__init__( + table=table, + partition=None, + database=database, + fail_missing_table=False, + client=client, + ) class ExternalHiveTask(luigi.ExternalTask): """ External task that depends on a Hive table/partition. """ - database = luigi.Parameter(default='default') table = luigi.Parameter() partition = luigi.DictParameter( @@ -559,14 +554,8 @@ class ExternalHiveTask(luigi.ExternalTask): ) def output(self): - if self.partition: - return HivePartitionTarget( - database=self.database, - table=self.table, - partition=self.partition, - ) - else: - return HiveTableTarget( - database=self.database, - table=self.table, - ) + return HivePartitionTarget( + database=self.database, + table=self.table, + partition=self.partition, + ) diff --git a/test/contrib/hive_test.py b/test/contrib/hive_test.py index 7dac032baf..a50cd0f8f3 100644 --- a/test/contrib/hive_test.py +++ b/test/contrib/hive_test.py @@ -460,7 +460,7 @@ def test_hive_table_target(self): client = mock.Mock() target = luigi.contrib.hive.HiveTableTarget(database='db', table='foo', client=client) target.exists() - client.table_exists.assert_called_with('foo', 'db') + client.table_exists.assert_called_with('foo', 'db', None) def test_hive_partition_target(self): client = mock.Mock() @@ -480,9 +480,10 @@ class _Task(luigi.contrib.hive.ExternalHiveTask): output = _Task().output() # assert - assert isinstance(output, luigi.contrib.hive.HiveTableTarget) + assert isinstance(output, luigi.contrib.hive.HivePartitionTarget) assert output.database == 'schema1' assert output.table == 'table1' + assert output.partition == {} def test_partition_exists(self): # arrange