Skip to content

Google Bigquery connector#3921

Open
ddillinger wants to merge 15 commits intoelastic:mainfrom
ddillinger:bigquery-connector
Open

Google Bigquery connector#3921
ddillinger wants to merge 15 commits intoelastic:mainfrom
ddillinger:bigquery-connector

Conversation

@ddillinger
Copy link

Closes https://github.com/elastic/telemetry/issues/5711

image image
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] --- Counters ---
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'bulk_item_responses.index' : 19640
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'bulk_operations.index' : 19640
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'deleted_document_count' : 0
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'doc_creates_queued' : 19640
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'docs_extracted' : 19640
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'indexed_document_count' : 19640
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'indexed_document_volume' : 34
[FMWK][20:20:06][INFO] [Connector id: YHIy4psBkeeBPafwWdsI, index name: connector-bigquery-822b, Sync job id: Y3I24psBkeeBPafwvNsE] 'result_successes' : 19640
GET connector-bigquery-822b/_count
{
  "count": 19640,
  "_shards": {
    "total": 2,
    "successful": 2,
    "skipped": 0,
    "failed": 0
  }
}
POST connector-bigquery-822b/_search
{
  "query": {
    "match_all": {}
  },
  "size": 1
}

"hits": [
      {
        "_index": "connector-bigquery-822b",
        "_id": "d-n4GZPTRl2wzwm-DXdjFw",
        "_score": 1,
        "_source": {
          "trip_id": "ASP19GEN-1037-Sunday-00_066850_1..N03R",
          "direction_id": "0",
          "route_id": "1",
          "service_id": "ASP19GEN-1037-Sunday-00",
          "shape_id": "1..N03R",
          "id": "d-n4GZPTRl2wzwm-DXdjFw",
          "trip_headsign": "Van Cortlandt Park - 242 St",
          "_timestamp": "2026-01-21T20:19:59.569120+00:00",
          "block_id": null
        }
      }
    ]

Checklists

Pre-Review Checklist

  • this PR does NOT contain credentials of any kind, such as API keys or username/passwords (double check config.yml.example)
  • this PR has a meaningful title
  • this PR links to all relevant github issues that it fixes or partially addresses
  • this PR has a thorough description
  • Covered the changes with automated tests
  • Tested the changes locally
  • Added a label for each target release version (example: v7.13.2, v7.14.0, v8.0.0)
  • Considered corresponding documentation changes

Changes Requiring Extra Attention

  • New dependencies added.

Dependency: google-cloud-bigquery client library pypi github apache 2.0 license

Dan Dillinger added 10 commits January 21, 2026 14:58
Unfortunately, indexes are created as part of the pre-setup workflow in kibana. The user
has no opportunity to provide custom settings.

In fact, if you want custom settings (like we do, for "index.mode": "lookup"), you have
to let it create the index, pause before config, DELETE the index it created, and create
a new one with the same name, set up the way you want it.

You can't even pre-create it. The workflow fails on "index already exists." ಠ_ಠ

Anyway yeah so this can't actually happen here so I removed it.

fix project_id validation in terms of resolve_project()

use new style querying to implement ping()

implement row2dict()

 - allow a doc_id_column config for the ES doc _id
 - use a 22 char urlsafe uuid4 if none is configured, same as ES would do
 - allow timestamp_column to provide ES doc _timestamp
 - use run start timestamp if none is configured

implement set_logger() on client wrapper
test_get_docs()

test_ping_negative()
fix typo

unused as of now
@ddillinger ddillinger self-assigned this Jan 21, 2026
@ddillinger ddillinger requested a review from a team as a code owner January 21, 2026 20:35
Copy link
Member

@artem-shelkovnikov artem-shelkovnikov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, it already looks pretty good.

A couple things:

  1. I see that you specify a table to ingest. Do you also want to be able to specify a query to ingest results from? Do you want it by default, or do you want to have both?
  2. We have a functional test suite that attempts to run a connector against a "real"-ish source. If possible it uses a docker image of 3rd-party (like here. I don't think there's a docker of BigQuery, but I've seen emulators of it. To merge a connector we need functional test suite, is it something you can give a look too?

@ddillinger
Copy link
Author

Thanks so much for the review!

Re: 1, I suppose we could do that. My thinking at the time was that even if we wanted a query we'd do it as a view in BQ, rather than client side, but I can see it either way really. Can config be made mutually exclusive in UI? I know it would be validated for too, but that's a worse user experience.

Re: 2, TIL! 😁 Thanks for the example link, I will have a look at what is in there and see what I can figure out.

@artem-shelkovnikov
Copy link
Member

Can config be made mutually exclusive in UI? I know it would be validated for too, but that's a worse user experience.

Yes! We've got mutually exclusive UI examples in other connectors, for example see https://github.com/elastic/connectors/blob/main/app/connectors_service/connectors/sources/sharepoint/sharepoint_online/datasource.py#L122-L147:

            "auth_method": {
                "label": "Authentication Method",
                "order": 4,
                "type": "str",
                "display": "dropdown",
                "options": [
                    {"label": "Client Secret", "value": "secret"},
                    {"label": "Certificate", "value": "certificate"},
                ],
                "value": "secret",
            },
            "secret_value": {
                "label": "Secret value",
                "order": 5,
                "sensitive": True,
                "type": "str",
                "depends_on": [{"field": "auth_method", "value": "secret"}],<<<<<<<< this gets rendered if auth_method is "secret"
            },
            "certificate": {
                "label": "Content of certificate file",
                "display": "textarea",
                "sensitive": True,
                "order": 6,
                "type": "str",
                "depends_on": [{"field": "auth_method", "value": "certificate"}], <<<<<<<< this gets rendered if auth_method is "certificate"
            },

@ddillinger
Copy link
Author

image image image
[FMWK][18:24:51][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Filtering validation started
[FMWK][18:24:51][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Filtering validation result: FilteringValidationState.VALID
[FMWK][18:24:51][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Collecting local document ids
[FMWK][18:24:51][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Iterating on remote documents
[FMWK][18:24:51][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Connected to Google Bigquery.
[FMWK][18:24:53][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Sync progress -- created: 0 | updated: 0 | deleted: 0
[FMWK][18:24:53][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Sync progress -- created: 100 | updated: 0 | deleted: 0
[FMWK][18:24:53][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Sync progress -- created: 200 | updated: 0 | deleted: 0
[FMWK][18:24:53][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Sync progress -- created: 300 | updated: 0 | deleted: 0
[FMWK][18:24:53][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Sync progress -- created: 400 | updated: 0 | deleted: 0
[FMWK][18:24:54][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Both Extractor and Sink tasks are successfully stopped.
[FMWK][18:24:54][INFO] [Connector id: iA_WBZwBDdNmVXexN9yh, index name: connector-custom-b022, Sync job id: iw_ZBZwBDdNmVXexcNyX] Sync ended with status completed -- created: 496 | updated: 0 | deleted: 0 (took 3 seconds)

"required": true,
"options": [],
"validations": [],
"value": "{\"type\": \"service_account\", \"project_id\": \"dummy_project_id\", \"private_key_id\": \"abc\", \"private_key\": \"\\n-----BEGIN PRIVATE KEY-----\\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\\nNtBLSfcqPULqD+h7br9lEJio\\n-----END PRIVATE KEY-----\\n\", \"client_email\": \"123-abc@developer.gserviceaccount.com\", \"client_id\": \"123-abc.apps.googleusercontent.com\", \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\", \"token_uri\": \"http://localhost:4444/token\"}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, is it actually used properly to auth to the emulated BigQuery? Is the PrivateKey valid and used, or it doesn't matter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all seriousness, I copied how the google_cloud_storage connector config works. There is a validation for this, which presently is unguarded by a RUNNING_FTEST check or anything, so it does sort of get tested as of now.

Comment on lines +154 to +175
"columns": {
"display": "textarea",
"label": "Columns to fetch. Defaults to * if none are set.",
"order": 7,
"required": False,
"default_value": "*",
"type": "str",
"ui_restrictions": ["advanced"],
"tooltip": "Comma-separated, as in a SQL SELECT.",
"depends_on": [{"field": "query_type", "value": "table"}],
},
"predicates": {
"display": "textarea",
"label": "Predicates for the query.",
"order": 8,
"required": False,
"default_value": "",
"type": "str",
"ui_restrictions": ["advanced"],
"tooltip": "A SQL WHERE clause. May be required for some partitioned table configurations.",
"depends_on": [{"field": "query_type", "value": "table"}],
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally expose such things via advanced sync rules, see for example how MySQL does it:

https://github.com/elastic/connectors/blob/main/app/connectors_service/connectors/sources/mysql/datasource.py#L186

It's a little clunky in UI, but that was how we did it. I'll check with the team about it.

Curious about the way you implemented it - why not just allow the whole query to be put into config?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, because the original issue requested a full table sync. Maybe the requirements mindtrapped me a little bit on it. 😆 It did seem maybe really easy to "just put a table in and it does the rest" but you'd know a whole lot more than I would about how users typically approach connectors, so if you think it's overkill and would cause more confusion than it solves, we can just take it out and go "write your query here" only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our normal experience was indeed provide a table name to sync and connector does everything.

As for the columns to choose - I think it makes sense to have a choice of:

  1. You choose a table to sync, you get * from this table
  2. You choose a query to sync, there you can specify fields, WHERE statements and so

So it feels like we can drop columns and predicates to simplify connector design/code a bit.

Before we normally provided it this way, but 2. was a part of advanced sync rules, rather than configuration values. I think it makes sense the way you did it now - with a part of configuration, rather than advanced sync rules TBH.

"tooltip": "A SQL WHERE clause. May be required for some partitioned table configurations.",
"depends_on": [{"field": "query_type", "value": "table"}],
},
"doc_id_column": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm BQ does not have concept of ID columns or no way to determine PKs of the table? Or it's an override?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not, not in the way that we would normally think of one. Docs.

  • They are not required or anything.
  • They can be specified, but primarily exist for query optimizing.
  • BigQuery doesn't enforce primary and foreign key constraints. So, BQ can't make guarantees about the results.
  • There isn't a user exposed row identifier either. If you need one and don't have one you have to do it yourself with something like ROW_NUMBER() OVER(PARTITION BY some_column ORDER BY timestamp DESC) AS row_num (or whatever is appropriate for your data, to get a reasonably deterministic outcome).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'd recommend making this doc_id_columns then to allow specifying multiple columns to become a part of the id we use to ingest to Elasticsearch

@seanstory
Copy link
Member

buildkite test this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants