Skip to content

Commit

Permalink
Merge pull request #2532 from Roviluca/main
Browse files Browse the repository at this point in the history
Permit Bigquery output to create jobs in a different project
  • Loading branch information
Jeffail authored Sep 5, 2024
2 parents b2ca626 + 6a5ca54 commit 4db3939
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
11 changes: 11 additions & 0 deletions docs/modules/components/pages/outputs/gcp_bigquery.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ output:
label: ""
gcp_bigquery:
project: ""
job_project: ""
dataset: "" # No default (required)
table: "" # No default (required)
format: NEWLINE_DELIMITED_JSON
Expand Down Expand Up @@ -67,6 +68,7 @@ output:
label: ""
gcp_bigquery:
project: ""
job_project: ""
dataset: "" # No default (required)
table: "" # No default (required)
format: NEWLINE_DELIMITED_JSON
Expand Down Expand Up @@ -145,6 +147,15 @@ This output benefits from sending messages as a batch for improved performance.
The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.
*Type*: `string`
*Default*: `""`
=== `job_project`
The project ID in which jobs will be exectuted. If not set, project will be used.
*Type*: `string`
*Default*: `""`
Expand Down
16 changes: 12 additions & 4 deletions internal/impl/gcp/output_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func gcpBigQueryCSVConfigFromParsed(conf *service.ParsedConfig) (csvconf gcpBigQ
}

type gcpBigQueryOutputConfig struct {
JobProjectID string
ProjectID string
DatasetID string
TableID string
Expand All @@ -85,6 +86,12 @@ func gcpBigQueryOutputConfigFromParsed(conf *service.ParsedConfig) (gconf gcpBig
if gconf.ProjectID == "" {
gconf.ProjectID = bigquery.DetectProjectID
}
if gconf.JobProjectID, err = conf.FieldString("job_project"); err != nil {
return
}
if gconf.JobProjectID == "" {
gconf.JobProjectID = gconf.ProjectID
}
if gconf.DatasetID, err = conf.FieldString("dataset"); err != nil {
return
}
Expand Down Expand Up @@ -131,9 +138,9 @@ func (g gcpBQClientURL) NewClient(ctx context.Context, conf gcpBigQueryOutputCon
if err != nil {
return nil, err
}
return bigquery.NewClient(ctx, conf.ProjectID, opt...)
return bigquery.NewClient(ctx, conf.JobProjectID, opt...)
}
return bigquery.NewClient(ctx, conf.ProjectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
return bigquery.NewClient(ctx, conf.JobProjectID, option.WithoutAuthentication(), option.WithEndpoint(string(g)))
}

func gcpBigQueryConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -179,6 +186,7 @@ The same is true for the CSV format.
For the CSV format when the field ` + "`csv.header`" + ` is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.` + service.OutputPerformanceDocs(true, true)).
Field(service.NewStringField("project").Description("The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.").Default("")).
Field(service.NewStringField("job_project").Description("The project ID in which jobs will be exectuted. If not set, project will be used.").Default("")).
Field(service.NewStringField("dataset").Description("The BigQuery Dataset ID.")).
Field(service.NewStringField("table").Description("The table to insert messages to.")).
Field(service.NewStringEnumField("format", string(bigquery.JSON), string(bigquery.CSV)).
Expand Down Expand Up @@ -337,7 +345,7 @@ func (g *gcpBigQueryOutput) Connect(ctx context.Context) (err error) {
}
}()

dataset := client.DatasetInProject(client.Project(), g.conf.DatasetID)
dataset := client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID)
if _, err = dataset.Metadata(ctx); err != nil {
if hasStatusCode(err, http.StatusNotFound) {
err = fmt.Errorf("dataset does not exist: %v", g.conf.DatasetID)
Expand Down Expand Up @@ -410,7 +418,7 @@ func (g *gcpBigQueryOutput) WriteBatch(ctx context.Context, batch service.Messag
}

func (g *gcpBigQueryOutput) createTableLoader(data *[]byte) *bigquery.Loader {
table := g.client.DatasetInProject(g.client.Project(), g.conf.DatasetID).Table(g.conf.TableID)
table := g.client.DatasetInProject(g.conf.ProjectID, g.conf.DatasetID).Table(g.conf.TableID)

source := bigquery.NewReaderSource(bytes.NewReader(*data))
source.SourceFormat = bigquery.DataFormat(g.conf.Format)
Expand Down

0 comments on commit 4db3939

Please sign in to comment.