将 BigQuery 与 Workers AI 结合使用
开始使用 Workers AI 的最简单方法是在 多模式 Playground ↗ 和 LLM playground ↗ 中进行试用。如果您决定要将代码与 Workers AI 集成,那么您可能会决定使用其 REST API 端点 或通过 Worker 绑定。
但是,数据怎么办?如果您希望这些模型摄取存储在 Cloudflare 外部的数据该怎么办?
在本教程中,您将学习如何将 Google BigQuery 中的数据引入 Cloudflare Worker,以便将其用作 Workers AI 模型的输入。
您将需要:
- 一个运行 Hello World 脚本 的 Cloudflare Worker 项目。
- 一个 Google Cloud Platform 服务帐户 ↗,并已下载具有 BigQuery 读取权限的关联密钥 ↗文件。
- 访问具有一些测试数据的 BigQuery 表,以便您可以创建 BigQuery 作业查询 ↗。在本教程中,建议您创建自己的表,因为抽样表 ↗(除非克隆到您自己的 GCP 命名空间)将不允许您对其运行作业查询。对于此示例,使用了 Hacker News Corpus ↗,该数据集在 MIT 许可下使用。
要将数据摄取到 Cloudflare 并将其提供给 Workers AI,您将使用 Cloudflare Worker。如果您尚未创建,请随时查看我们的入门教程。
按照创建 Worker 的步骤操作后,您的新 Worker 项目中应包含以下代码:
export default { async fetch(request, env, ctx) { return new Response("Hello World!"); },};
如果 Worker 项目已成功创建,您还应该能够在控制台中运行 npx wrangler dev
以在本地运行 Worker:
[wrangler:inf] Ready on http://localhost:8787
在 http://localhost:8787/
打开一个浏览器选项卡以查看您部署的 Worker。请注意,端口 8787
在您的情况下可能是不同的。
您应该在浏览器中看到 Hello World!
:
Hello World!
如果在此步骤中遇到任何问题,请务必查看 Worker 入门指南。
现在您已验证 Worker 已成功创建,您需要引用在本教程的先决条件部分中创建的 Google Cloud Platform 服务密钥。
您从 Google Cloud Platform 下载的密钥 JSON 文件应具有以下格式:
{ "type": "service_account", "project_id": "<your_project_id>", "private_key_id": "<your_private_key_id>", "private_key": "<your_private_key>", "client_email": "<your_service_account_id>@<your_project_id>.iam.gserviceaccount.com", "client_id": "<your_oauth2_client_id>", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/<your_service_account_id>%40<your_project_id>.iam.gserviceaccount.com", "universe_domain": "googleapis.com"}
在本教程中,您将只需要以下字段的值:client_email
、private_key
、private_key_id
和 project_id
。
您将使用机密而不是将此信息以纯文本形式存储在 Worker 中,以确保其未加密内容只能通过 Worker 本身访问。
从 JSON 文件中将这三个值导入机密,首先是 JSON 密钥文件中名为 client_email
的字段,我们现在将其称为 BQ_CLIENT_EMAIL
(您可以使用另一个变量名):
npx wrangler secret put BQ_CLIENT_EMAIL
系统将要求您输入一个机密值,该值将是 JSON 密钥文件中 client_email
字段的值。
如果机密上传成功,将显示以下消息:
✨ Success! Uploaded secret BQ_CLIENT_EMAIL
现在导入其余三个字段的机密;private_key
、private_key_id
和 project_id
分别为 BQ_PRIVATE_KEY
、BQ_PRIVATE_KEY_ID
和 BQ_PROJECT_ID
:
npx wrangler secret put BQ_PRIVATE_KEY
npx wrangler secret put BQ_PRIVATE_KEY_ID
npx wrangler secret put BQ_PROJECT_ID
此时,您已成功将从 Google Cloud Platform 下载的 JSON 密钥文件中的三个字段导入 Cloudflare 机密,以在 Worker 中使用。
机密仅在部署后才对 Workers 可用。要在开发期间使它们可用,请创建一个 .dev.vars
文件以在本地存储这些凭据并将其引用为环境变量。
您的 dev.vars
文件应如下所示:
BQ_CLIENT_EMAIL="<your_service_account_id>@<your_project_id>.iam.gserviceaccount.com"BQ_CLIENT_KEY="-----BEGIN PRIVATE KEY-----<content_of_your_private_key>-----END PRIVATE KEY-----\n"BQ_PRIVATE_KEY_ID="<your_private_key_id>"BQ_PROJECT_ID="<your_project_id>"
确保将 .dev.vars
添加到项目的 .gitignore
文件中,以防止在使用版本控制系统时将凭据上传到存储库。
通过将 src/index.js
中的值记录到控制台输出来检查机密是否已正确加载:
export default { async fetch(request, env, ctx) { console.log("BQ_CLIENT_EMAIL: ", env.BQ_CLIENT_EMAIL); console.log("BQ_PRIVATE_KEY: ", env.BQ_PRIVATE_KEY); console.log("BQ_PRIVATE_KEY_ID: ", env.BQ_PRIVATE_KEY_ID); console.log("BQ_PROJECT_ID: ", env.BQ_PROJECT_ID); return new Response("Hello World!"); },};
重新启动 Worker 并运行 npx wrangler dev
。您应该看到服务器现在提到了新添加的变量:
Using vars defined in .dev.varsYour worker has access to the following bindings:- Vars: - BQ_CLIENT_EMAIL: "(hidden)" - BQ_PRIVATE_KEY: "(hidden)" - BQ_PRIVATE_KEY_ID: "(hidden)" - BQ_PROJECT_ID: "(hidden)"[wrangler:inf] Ready on http://localhost:8787
如果您在浏览器中打开 http://localhost:8787
,您应该会在运行 npx wrangler dev
命令的控制台中看到变量的值,而在浏览器窗口中仍然只能看到 Hello World!
文本。
您现在可以从 Worker 访问 GCP 凭据。接下来,您将安装一个库来帮助创建与 GCP API 交互所需的 JSON Web 令牌。
要与 BigQuery 的 REST API 交互,您需要生成一个 JSON Web 令牌 ↗以使用您在上一步中加载到 Worker 机密中的凭据对您的请求进行身份验证。
在本教程中,您将使用 jose ↗ 库进行与 JWT 相关的操作。通过在控制台中运行以下命令来安装它:
npm i jose
要验证安装是否成功,您可以运行 npm list
,它会列出所有已安装的包,并查看是否已添加 jose
依赖项:
<project_name>@0.0.0/<path_to_your_project>/<project_name>
现在您已经安装了 jose
库,是时候导入它并向您的代码中添加一个函数来生成签名的 JWT:
import * as jose from 'jose';...const generateBQJWT = async (aCryptoKey, env) => {const algorithm = "RS256";const audience = "https://bigquery.googleapis.com/";const expiryAt = (new Date().valueOf() / 1000); const privateKey = await jose.importPKCS8(env.BQ_PRIVATE_KEY, algorithm);
// Generate signed JSON Web Token (JWT) return new jose.SignJWT() .setProtectedHeader({ typ: 'JWT', alg: algorithm, kid: env.BQ_PRIVATE_KEY_ID }) .setIssuer(env.BQ_CLIENT_EMAIL) .setSubject(env.BQ_CLIENT_EMAIL) .setAudience(audience) .setExpirationTime(expiryAt) .setIssuedAt() .sign(privateKey)}
export default { async fetch(request, env, ctx) { ...// 创建 JWT 以对 BigQuery API 调用进行身份验证 let bqJWT; try { bqJWT = await generateBQJWT(env); } catch (e) { return new Response('在生成 JWT 时发生错误', { status: 500 }) } }, ...};
现在您已经创建了一个 JWT,是时候对 BigQuery 进行 API 调用以获取一些数据了。
使用上一步中创建的 JWT 令牌,向 BigQuery 的 API 发出 API 请求以从表中检索数据。
您现在将查询您在本教程的先决条件部分中已在 BigQuery 中创建的表。此示例使用在 MIT 许可下使用的 Hacker News Corpus ↗ 的抽样版本,并已上传到 BigQuery。
const queryBQ = async (bqJWT, path) => { const bqEndpoint = `https://bigquery.googleapis.com${path}` // 在此示例中,text 是正在查询的 BigQuery 表(hn.news_sampled)中的一个字段 const query = 'SELECT text FROM hn.news_sampled LIMIT 3'; const response = await fetch(bqEndpoint, { method: "POST", body: JSON.stringify({ "query": query }), headers: { Authorization: `Bearer ${bqJWT}` } }) return response.json()}...export default { async fetch(request, env, ctx) { ... let ticketInfo; try { ticketInfo = await queryBQ(bqJWT); } catch (e) { return new Response('An error has occurred while querying BQ', { status: 500 }); } ... },};
Having the raw row data from BigQuery means that you can now format it in a JSON-like style up next.
Now that you have retrieved the data from BigQuery, it is time to note that a BigQuery API response looks something like this:
{ ... "schema": { "fields": [ { "name": "title", "type": "STRING", "mode": "NULLABLE" }, { "name": "text", "type": "STRING", "mode": "NULLABLE" } ] }, ... "rows": [ { "f": [ { "v": "<some_value>" }, { "v": "<some_value>" } ] }, { "f": [ { "v": "<some_value>" }, { "v": "<some_value>" } ] }, { "f": [ { "v": "<some_value>" }, { "v": "<some_value>" } ] } ], ...}
This format may be difficult to read and to work with when iterating through results, which will go on to do later in this tutorial. So you will now implement a function that maps the schema into each individual value, and the resulting output will be easier to read, as shown below. Each row corresponds to an object within an array.
[ { title: "<some_value>", text: "<some_value>", }, { title: "<some_value>", text: "<some_value>", }, { title: "<some_value>", text: "<some_value>", },];
Create a formatRows
function that takes a number of rows and fields returned from the BigQuery response body and returns an array of results as objects with named fields.
const formatRows = (rowsWithoutFieldNames, fields) => { // Depending on the position of each value, it is known what field you should assign to it. const fieldsByIndex = new Map();
// Load all fields name and have their index in the array result as their key fields.forEach((field, index) => { fieldsByIndex.set(index, field.name) })
// Iterate through rows const rowsWithFieldNames = rowsWithoutFieldNames.map(row => { // Per each row represented by an array f, iterate through the unnamed values and find their field names by searching them in the fieldsByIndex. let newRow = {} row.f.forEach((field, index) => { const fieldName = fieldsByIndex.get(index); if (fieldName) { // For every field in a row, add them to newRow newRow = ({ ...newRow, [fieldName]: field.v }); } }) return newRow })
return rowsWithFieldNames}
export default { async fetch(request, env, ctx) { ... // Transform output format into array of objects with named fields let formattedResults;
if ('rows' in ticketInfo) { formattedResults = formatRows(ticketInfo.rows, ticketInfo.schema.fields); console.log(formattedResults) } else if ('error' in ticketInfo) { return new Response(ticketInfo.error.message, { status: 500 }) } ... },};
Now that you have converted the response from the BigQuery API into an array of results, generate some tags and attach an associated sentiment score using an LLM via Workers AI:
const generateTags = (data, env) => { return env.AI.run("@cf/meta/llama-3.1-8b-instruct", { prompt: `Create three one-word tags for the following text. return only these three tags separated by a comma. don't return text that is not a category.Lowercase only. ${JSON.stringify(data)}`, });}
const generateSentimentScore = (data, env) => { return env.AI.run("@cf/meta/llama-3.1-8b-instruct", { prompt: `return a float number between 0 and 1 measuring the sentiment of the following text. 0 being negative and 1 positive. return only the number, no text. ${JSON.stringify(data)}`, });}
// Iterates through values, sends them to an AI handler and encapsulates all responses into a single Promiseconst getAIGeneratedContent = (data, env, aiHandler) => { let results = data?.map(dataPoint => { return aiHandler(dataPoint, env) }) return Promise.all(results)}...export default { async fetch(request, env, ctx) { ...let summaries, sentimentScores; try { summaries = await getAIGeneratedContent(formattedResults, env, generateTags); sentimentScores = await getAIGeneratedContent(formattedResults, env, generateSentimentScore) } catch { return new Response('There was an error while generating the text summaries or sentiment scores') }},
formattedResults = formattedResults?.map((formattedResult, i) => { if (sentimentScores[i].response && summaries[i].response) { return { ...formattedResult, 'sentiment': parseFloat(sentimentScores[i].response).toFixed(2), 'tags': summaries[i].response.split(',').map((result) => result.trim()) } } }};
Uncomment the following lines from the Wrangler file in your project:
{ "ai": { "binding": "AI" }}
[ai]binding = "AI"
Restart the Worker that is running locally, and after doing so, go to your application endpoint:
curl http://localhost:8787
It is likely that you will be asked to log in to your Cloudflare account and grant temporary access to Wrangler (the Cloudflare CLI) to use your account when using Worker AI.
Once you access http://localhost:8787
you should see an output similar to the following:
{ "data": [ { "text": "You can see a clear spike in submissions right around US Thanksgiving.", "sentiment": "0.61", "tags": [ "trends", "submissions", "thanksgiving" ] }, { "text": "I didn't test the changes before I published them. I basically did development on the running server. In fact for about 30 seconds the comments page was broken due to a bug.", "sentiment": "0.35", "tags": [ "software", "deployment", "error" ] }, { "text": "I second that. As I recall, it's a very enjoyable 700-page brain dump by someone who's really into his subject. The writing has a personal voice; there are lots of asides, dry wit, and typos that suggest restrained editing. The discussion is intelligent and often theoretical (and Bartle is not scared to use mathematical metaphors), but the tone is not academic.", "sentiment": "0.86", "tags": [ "review", "game", "design" ] } ]}
The actual values and fields will mostly depend on the query made in Step 5 that are then fed into the LLMs models.
All the code shown in the different steps are combined into the following code in src/index.js
:
import * as jose from "jose";
const generateBQJWT = async (env) => { const algorithm = "RS256"; const audience = "https://bigquery.googleapis.com/"; const expiryAt = new Date().valueOf() / 1000; const privateKey = await jose.importPKCS8(env.BQ_PRIVATE_KEY, algorithm);
// Generate signed JSON Web Token (JWT) return new jose.SignJWT() .setProtectedHeader({ typ: "JWT", alg: algorithm, kid: env.BQ_PRIVATE_KEY_ID, }) .setIssuer(env.BQ_CLIENT_EMAIL) .setSubject(env.BQ_CLIENT_EMAIL) .setAudience(audience) .setExpirationTime(expiryAt) .setIssuedAt() .sign(privateKey);};
const queryBQ = async (bgJWT, path) => { const bqEndpoint = `https://bigquery.googleapis.com${path}`; const query = "SELECT text FROM hn.news_sampled LIMIT 3"; const response = await fetch(bqEndpoint, { method: "POST", body: JSON.stringify({ query: query, }), headers: { Authorization: `Bearer ${bgJWT}`, }, }); return response.json();};
const formatRows = (rowsWithoutFieldNames, fields) => { // Index to fieldName const fieldsByIndex = new Map();
fields.forEach((field, index) => { fieldsByIndex.set(index, field.name); });
const rowsWithFieldNames = rowsWithoutFieldNames.map((row) => { // Map rows into an array of objects with field names let newRow = {}; row.f.forEach((field, index) => { const fieldName = fieldsByIndex.get(index); if (fieldName) { newRow = { ...newRow, [fieldName]: field.v }; } }); return newRow; });
return rowsWithFieldNames;};
const generateTags = (data, env) => { return env.AI.run("@cf/meta/llama-3.1-8b-instruct", { prompt: `Create three one-word tags for the following text. return only these three tags separated by a comma. don't return text that is not a category.Lowercase only. ${JSON.stringify(data)}`, });};
const generateSentimentScore = (data, env) => { return env.AI.run("@cf/meta/llama-3.1-8b-instruct", { prompt: `return a float number between 0 and 1 measuring the sentiment of the following text. 0 being negative and 1 positive. return only the number, no text. ${JSON.stringify(data)}`, });};
const getAIGeneratedContent = (data, env, aiHandler) => { let results = data?.map((dataPoint) => { return aiHandler(dataPoint, env); }); return Promise.all(results);};
export default { async fetch(request, env, ctx) { // Create JWT to authenticate the BigQuery API call let bqJWT; try { bqJWT = await generateBQJWT(env); } catch (error) { console.log(error); return new Response("An error has occurred while generating the JWT", { status: 500, }); }
// Fetch results from BigQuery let ticketInfo; try { ticketInfo = await queryBQ( bqJWT, `/bigquery/v2/projects/${env.BQ_PROJECT_ID}/queries`, ); } catch (error) { console.log(error); return new Response("An error has occurred while querying BQ", { status: 500, }); }
// Transform output format into array of objects with named fields let formattedResults; if ("rows" in ticketInfo) { formattedResults = formatRows(ticketInfo.rows, ticketInfo.schema.fields); } else if ("error" in ticketInfo) { return new Response(ticketInfo.error.message, { status: 500 }); }
// Generate AI summaries and sentiment scores let summaries, sentimentScores; try { summaries = await getAIGeneratedContent( formattedResults, env, generateTags, ); sentimentScores = await getAIGeneratedContent( formattedResults, env, generateSentimentScore, ); } catch { return new Response( "There was an error while generating the text summaries or sentiment scores", ); }
// Add AI summaries and sentiment scores to previous results formattedResults = formattedResults?.map((formattedResult, i) => { if (sentimentScores[i].response && summaries[i].response) { return { ...formattedResult, sentiment: parseFloat(sentimentScores[i].response).toFixed(2), tags: summaries[i].response.split(",").map((result) => result.trim()), }; } });
const response = { data: formattedResults };
return new Response(JSON.stringify(response), { headers: { "Content-Type": "application/json" }, }); },};
If you wish to deploy this Worker, you can do so by running npx wrangler deploy
:
Total Upload: <size_of_your_worker> KiB / gzip: <compressed_size_of_your_worker> KiBUploaded <name_of_your_worker> (x sec)Deployed <name_of_your_worker> triggers (x sec) https://<your_public_worker_endpoint>Current Version ID: <worker_script_version_id>
This will create a public endpoint that you can use to access the Worker globally. Please keep this in mind when using production data, and make sure to include additional access controls in place.
In this tutorial, you have learnt how to integrate Google BigQuery and Cloudflare Workers by creating a GCP service account key and storing part of it as Worker secrets. This was later imported in the code, and by using the jose
npm library, you created a JSON Web Token to authenticate the API query to BigQuery.
Once you obtained the results, you formatted them to later be passed to generative AI models via Workers AI to generate tags and to perform sentiment analysis on the extracted data.
If, instead of displaying the results of ingesting the data to the AI model in a browser, your workflow requires fetching and store data (for example in R2 or D1) on regular intervals, you may want to consider adding a scheduled handler for this Worker. It allows triggering the Worker with a predefined cadence via a Cron Trigger. Consider reviewing the Reference Architecture Diagrams on Ingesting BigQuery Data into Workers AI.
A use case to ingest data from other sources, like you did in this tutorial, is to create a RAG system. If this sounds relevant to you, please check out the tutorial Build a Retrieval Augmented Generation (RAG) AI.
To learn more about what other AI models you can use at Cloudflare, please visit the Workers AI section of our docs.
- @2025 Cloudflare Ubitools
- Cf Repo