Skip to content
Cloudflare Docs
非官方翻译 - 此文档为非官方中文翻译版本,仅供参考。如有疑问请以 英文官方文档 为准。

将 BigQuery 与 Workers AI 结合使用

Last reviewed: 8 months ago

开始使用 Workers AI 的最简单方法是在 多模式 PlaygroundLLM playground 中进行试用。如果您决定要将代码与 Workers AI 集成,那么您可能会决定使用其 REST API 端点 或通过 Worker 绑定

但是,数据怎么办?如果您希望这些模型摄取存储在 Cloudflare 外部的数据该怎么办?

在本教程中,您将学习如何将 Google BigQuery 中的数据引入 Cloudflare Worker,以便将其用作 Workers AI 模型的输入。

先决条件

您将需要:

1. 设置您的 Cloudflare Worker

要将数据摄取到 Cloudflare 并将其提供给 Workers AI,您将使用 Cloudflare Worker。如果您尚未创建,请随时查看我们的入门教程

按照创建 Worker 的步骤操作后,您的新 Worker 项目中应包含以下代码:

export default {
async fetch(request, env, ctx) {
return new Response("Hello World!");
},
};

如果 Worker 项目已成功创建,您还应该能够在控制台中运行 npx wrangler dev 以在本地运行 Worker:

Terminal window
[wrangler:inf] Ready on http://localhost:8787

http://localhost:8787/ 打开一个浏览器选项卡以查看您部署的 Worker。请注意,端口 8787 在您的情况下可能是不同的。

您应该在浏览器中看到 Hello World!

Terminal window
Hello World!

如果在此步骤中遇到任何问题,请务必查看 Worker 入门指南

2. 将 GCP 服务密钥作为机密导入 Worker

现在您已验证 Worker 已成功创建,您需要引用在本教程的先决条件部分中创建的 Google Cloud Platform 服务密钥。

您从 Google Cloud Platform 下载的密钥 JSON 文件应具有以下格式:

{
"type": "service_account",
"project_id": "<your_project_id>",
"private_key_id": "<your_private_key_id>",
"private_key": "<your_private_key>",
"client_email": "<your_service_account_id>@<your_project_id>.iam.gserviceaccount.com",
"client_id": "<your_oauth2_client_id>",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/<your_service_account_id>%40<your_project_id>.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}

在本教程中,您将只需要以下字段的值:client_emailprivate_keyprivate_key_idproject_id

您将使用机密而不是将此信息以纯文本形式存储在 Worker 中,以确保其未加密内容只能通过 Worker 本身访问。

从 JSON 文件中将这三个值导入机密,首先是 JSON 密钥文件中名为 client_email 的字段,我们现在将其称为 BQ_CLIENT_EMAIL(您可以使用另一个变量名):

Terminal window
npx wrangler secret put BQ_CLIENT_EMAIL

系统将要求您输入一个机密值,该值将是 JSON 密钥文件中 client_email 字段的值。

如果机密上传成功,将显示以下消息:

Terminal window
Success! Uploaded secret BQ_CLIENT_EMAIL

现在导入其余三个字段的机密;private_keyprivate_key_idproject_id 分别为 BQ_PRIVATE_KEYBQ_PRIVATE_KEY_IDBQ_PROJECT_ID

Terminal window
npx wrangler secret put BQ_PRIVATE_KEY
Terminal window
npx wrangler secret put BQ_PRIVATE_KEY_ID
Terminal window
npx wrangler secret put BQ_PROJECT_ID

此时,您已成功将从 Google Cloud Platform 下载的 JSON 密钥文件中的三个字段导入 Cloudflare 机密,以在 Worker 中使用。

机密仅在部署后才对 Workers 可用。要在开发期间使它们可用,请创建一个 .dev.vars 文件以在本地存储这些凭据并将其引用为环境变量。

您的 dev.vars 文件应如下所示:

BQ_CLIENT_EMAIL="<your_service_account_id>@<your_project_id>.iam.gserviceaccount.com"
BQ_CLIENT_KEY="-----BEGIN PRIVATE KEY-----<content_of_your_private_key>-----END PRIVATE KEY-----\n"
BQ_PRIVATE_KEY_ID="<your_private_key_id>"
BQ_PROJECT_ID="<your_project_id>"

确保将 .dev.vars 添加到项目的 .gitignore 文件中,以防止在使用版本控制系统时将凭据上传到存储库。

通过将 src/index.js 中的值记录到控制台输出来检查机密是否已正确加载:

export default {
async fetch(request, env, ctx) {
console.log("BQ_CLIENT_EMAIL: ", env.BQ_CLIENT_EMAIL);
console.log("BQ_PRIVATE_KEY: ", env.BQ_PRIVATE_KEY);
console.log("BQ_PRIVATE_KEY_ID: ", env.BQ_PRIVATE_KEY_ID);
console.log("BQ_PROJECT_ID: ", env.BQ_PROJECT_ID);
return new Response("Hello World!");
},
};

重新启动 Worker 并运行 npx wrangler dev。您应该看到服务器现在提到了新添加的变量:

Using vars defined in .dev.vars
Your worker has access to the following bindings:
- Vars:
- BQ_CLIENT_EMAIL: "(hidden)"
- BQ_PRIVATE_KEY: "(hidden)"
- BQ_PRIVATE_KEY_ID: "(hidden)"
- BQ_PROJECT_ID: "(hidden)"
[wrangler:inf] Ready on http://localhost:8787

如果您在浏览器中打开 http://localhost:8787,您应该会在运行 npx wrangler dev 命令的控制台中看到变量的值,而在浏览器窗口中仍然只能看到 Hello World! 文本。

您现在可以从 Worker 访问 GCP 凭据。接下来,您将安装一个库来帮助创建与 GCP API 交互所需的 JSON Web 令牌。

3. 安装用于处理 JWT 操作的库

要与 BigQuery 的 REST API 交互,您需要生成一个 JSON Web 令牌以使用您在上一步中加载到 Worker 机密中的凭据对您的请求进行身份验证。

在本教程中,您将使用 jose 库进行与 JWT 相关的操作。通过在控制台中运行以下命令来安装它:

Terminal window
npm i jose

要验证安装是否成功,您可以运行 npm list,它会列出所有已安装的包,并查看是否已添加 jose 依赖项:

Terminal window
<project_name>@0.0.0
/<path_to_your_project>/<project_name>
├── @cloudflare/[email protected]

4. 生成 JSON Web 令牌

现在您已经安装了 jose 库,是时候导入它并向您的代码中添加一个函数来生成签名的 JWT:

import * as jose from 'jose';
...
const generateBQJWT = async (aCryptoKey, env) => {
const algorithm = "RS256";
const audience = "https://bigquery.googleapis.com/";
const expiryAt = (new Date().valueOf() / 1000);
const privateKey = await jose.importPKCS8(env.BQ_PRIVATE_KEY, algorithm);
// Generate signed JSON Web Token (JWT)
return new jose.SignJWT()
.setProtectedHeader({
typ: 'JWT',
alg: algorithm,
kid: env.BQ_PRIVATE_KEY_ID
})
.setIssuer(env.BQ_CLIENT_EMAIL)
.setSubject(env.BQ_CLIENT_EMAIL)
.setAudience(audience)
.setExpirationTime(expiryAt)
.setIssuedAt()
.sign(privateKey)
}
export default {
async fetch(request, env, ctx) {
...
// 创建 JWT 以对 BigQuery API 调用进行身份验证
let bqJWT;
try {
bqJWT = await generateBQJWT(env);
} catch (e) {
return new Response('在生成 JWT 时发生错误', { status: 500 })
}
},
...
};

现在您已经创建了一个 JWT,是时候对 BigQuery 进行 API 调用以获取一些数据了。

5. 对 Google BigQuery 进行身份验证的请求

使用上一步中创建的 JWT 令牌,向 BigQuery 的 API 发出 API 请求以从表中检索数据。

您现在将查询您在本教程的先决条件部分中已在 BigQuery 中创建的表。此示例使用在 MIT 许可下使用的 Hacker News Corpus 的抽样版本,并已上传到 BigQuery。

const queryBQ = async (bqJWT, path) => {
const bqEndpoint = `https://bigquery.googleapis.com${path}`
// 在此示例中,text 是正在查询的 BigQuery 表(hn.news_sampled)中的一个字段
const query = 'SELECT text FROM hn.news_sampled LIMIT 3';
const response = await fetch(bqEndpoint, {
method: "POST",
body: JSON.stringify({
"query": query
}),
headers: {
Authorization: `Bearer ${bqJWT}`
}
})
return response.json()
}
...
export default {
async fetch(request, env, ctx) {
...
let ticketInfo;
try {
ticketInfo = await queryBQ(bqJWT);
} catch (e) {
return new Response('An error has occurred while querying BQ', { status: 500 });
}
...
},
};

Having the raw row data from BigQuery means that you can now format it in a JSON-like style up next.

6. Format results from the query

Now that you have retrieved the data from BigQuery, it is time to note that a BigQuery API response looks something like this:

{
...
"schema": {
"fields": [
{
"name": "title",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "text",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
...
"rows": [
{
"f": [
{
"v": "<some_value>"
},
{
"v": "<some_value>"
}
]
},
{
"f": [
{
"v": "<some_value>"
},
{
"v": "<some_value>"
}
]
},
{
"f": [
{
"v": "<some_value>"
},
{
"v": "<some_value>"
}
]
}
],
...
}

This format may be difficult to read and to work with when iterating through results, which will go on to do later in this tutorial. So you will now implement a function that maps the schema into each individual value, and the resulting output will be easier to read, as shown below. Each row corresponds to an object within an array.

[
{
title: "<some_value>",
text: "<some_value>",
},
{
title: "<some_value>",
text: "<some_value>",
},
{
title: "<some_value>",
text: "<some_value>",
},
];

Create a formatRows function that takes a number of rows and fields returned from the BigQuery response body and returns an array of results as objects with named fields.

const formatRows = (rowsWithoutFieldNames, fields) => {
// Depending on the position of each value, it is known what field you should assign to it.
const fieldsByIndex = new Map();
// Load all fields name and have their index in the array result as their key
fields.forEach((field, index) => {
fieldsByIndex.set(index, field.name)
})
// Iterate through rows
const rowsWithFieldNames = rowsWithoutFieldNames.map(row => {
// Per each row represented by an array f, iterate through the unnamed values and find their field names by searching them in the fieldsByIndex.
let newRow = {}
row.f.forEach((field, index) => {
const fieldName = fieldsByIndex.get(index);
if (fieldName) {
// For every field in a row, add them to newRow
newRow = ({ ...newRow, [fieldName]: field.v });
}
})
return newRow
})
return rowsWithFieldNames
}
export default {
async fetch(request, env, ctx) {
...
// Transform output format into array of objects with named fields
let formattedResults;
if ('rows' in ticketInfo) {
formattedResults = formatRows(ticketInfo.rows, ticketInfo.schema.fields);
console.log(formattedResults)
} else if ('error' in ticketInfo) {
return new Response(ticketInfo.error.message, { status: 500 })
}
...
},
};

7. Feed data into Workers AI

Now that you have converted the response from the BigQuery API into an array of results, generate some tags and attach an associated sentiment score using an LLM via Workers AI:

const generateTags = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `Create three one-word tags for the following text. return only these three tags separated by a comma. don't return text that is not a category.Lowercase only. ${JSON.stringify(data)}`,
});
}
const generateSentimentScore = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `return a float number between 0 and 1 measuring the sentiment of the following text. 0 being negative and 1 positive. return only the number, no text. ${JSON.stringify(data)}`,
});
}
// Iterates through values, sends them to an AI handler and encapsulates all responses into a single Promise
const getAIGeneratedContent = (data, env, aiHandler) => {
let results = data?.map(dataPoint => {
return aiHandler(dataPoint, env)
})
return Promise.all(results)
}
...
export default {
async fetch(request, env, ctx) {
...
let summaries, sentimentScores;
try {
summaries = await getAIGeneratedContent(formattedResults, env, generateTags);
sentimentScores = await getAIGeneratedContent(formattedResults, env, generateSentimentScore)
} catch {
return new Response('There was an error while generating the text summaries or sentiment scores')
}
},
formattedResults = formattedResults?.map((formattedResult, i) => {
if (sentimentScores[i].response && summaries[i].response) {
return {
...formattedResult,
'sentiment': parseFloat(sentimentScores[i].response).toFixed(2),
'tags': summaries[i].response.split(',').map((result) => result.trim())
}
}
}
};

Uncomment the following lines from the Wrangler file in your project:

{
"ai": {
"binding": "AI"
}
}

Restart the Worker that is running locally, and after doing so, go to your application endpoint:

Terminal window
curl http://localhost:8787

It is likely that you will be asked to log in to your Cloudflare account and grant temporary access to Wrangler (the Cloudflare CLI) to use your account when using Worker AI.

Once you access http://localhost:8787 you should see an output similar to the following:

Terminal window
{
"data": [
{
"text": "You can see a clear spike in submissions right around US Thanksgiving.",
"sentiment": "0.61",
"tags": [
"trends",
"submissions",
"thanksgiving"
]
},
{
"text": "I didn't test the changes before I published them. I basically did development on the running server. In fact for about 30 seconds the comments page was broken due to a bug.",
"sentiment": "0.35",
"tags": [
"software",
"deployment",
"error"
]
},
{
"text": "I second that. As I recall, it's a very enjoyable 700-page brain dump by someone who's really into his subject. The writing has a personal voice; there are lots of asides, dry wit, and typos that suggest restrained editing. The discussion is intelligent and often theoretical (and Bartle is not scared to use mathematical metaphors), but the tone is not academic.",
"sentiment": "0.86",
"tags": [
"review",
"game",
"design"
]
}
]
}

The actual values and fields will mostly depend on the query made in Step 5 that are then fed into the LLMs models.

Final result

All the code shown in the different steps are combined into the following code in src/index.js:

import * as jose from "jose";
const generateBQJWT = async (env) => {
const algorithm = "RS256";
const audience = "https://bigquery.googleapis.com/";
const expiryAt = new Date().valueOf() / 1000;
const privateKey = await jose.importPKCS8(env.BQ_PRIVATE_KEY, algorithm);
// Generate signed JSON Web Token (JWT)
return new jose.SignJWT()
.setProtectedHeader({
typ: "JWT",
alg: algorithm,
kid: env.BQ_PRIVATE_KEY_ID,
})
.setIssuer(env.BQ_CLIENT_EMAIL)
.setSubject(env.BQ_CLIENT_EMAIL)
.setAudience(audience)
.setExpirationTime(expiryAt)
.setIssuedAt()
.sign(privateKey);
};
const queryBQ = async (bgJWT, path) => {
const bqEndpoint = `https://bigquery.googleapis.com${path}`;
const query = "SELECT text FROM hn.news_sampled LIMIT 3";
const response = await fetch(bqEndpoint, {
method: "POST",
body: JSON.stringify({
query: query,
}),
headers: {
Authorization: `Bearer ${bgJWT}`,
},
});
return response.json();
};
const formatRows = (rowsWithoutFieldNames, fields) => {
// Index to fieldName
const fieldsByIndex = new Map();
fields.forEach((field, index) => {
fieldsByIndex.set(index, field.name);
});
const rowsWithFieldNames = rowsWithoutFieldNames.map((row) => {
// Map rows into an array of objects with field names
let newRow = {};
row.f.forEach((field, index) => {
const fieldName = fieldsByIndex.get(index);
if (fieldName) {
newRow = { ...newRow, [fieldName]: field.v };
}
});
return newRow;
});
return rowsWithFieldNames;
};
const generateTags = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `Create three one-word tags for the following text. return only these three tags separated by a comma. don't return text that is not a category.Lowercase only. ${JSON.stringify(data)}`,
});
};
const generateSentimentScore = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `return a float number between 0 and 1 measuring the sentiment of the following text. 0 being negative and 1 positive. return only the number, no text. ${JSON.stringify(data)}`,
});
};
const getAIGeneratedContent = (data, env, aiHandler) => {
let results = data?.map((dataPoint) => {
return aiHandler(dataPoint, env);
});
return Promise.all(results);
};
export default {
async fetch(request, env, ctx) {
// Create JWT to authenticate the BigQuery API call
let bqJWT;
try {
bqJWT = await generateBQJWT(env);
} catch (error) {
console.log(error);
return new Response("An error has occurred while generating the JWT", {
status: 500,
});
}
// Fetch results from BigQuery
let ticketInfo;
try {
ticketInfo = await queryBQ(
bqJWT,
`/bigquery/v2/projects/${env.BQ_PROJECT_ID}/queries`,
);
} catch (error) {
console.log(error);
return new Response("An error has occurred while querying BQ", {
status: 500,
});
}
// Transform output format into array of objects with named fields
let formattedResults;
if ("rows" in ticketInfo) {
formattedResults = formatRows(ticketInfo.rows, ticketInfo.schema.fields);
} else if ("error" in ticketInfo) {
return new Response(ticketInfo.error.message, { status: 500 });
}
// Generate AI summaries and sentiment scores
let summaries, sentimentScores;
try {
summaries = await getAIGeneratedContent(
formattedResults,
env,
generateTags,
);
sentimentScores = await getAIGeneratedContent(
formattedResults,
env,
generateSentimentScore,
);
} catch {
return new Response(
"There was an error while generating the text summaries or sentiment scores",
);
}
// Add AI summaries and sentiment scores to previous results
formattedResults = formattedResults?.map((formattedResult, i) => {
if (sentimentScores[i].response && summaries[i].response) {
return {
...formattedResult,
sentiment: parseFloat(sentimentScores[i].response).toFixed(2),
tags: summaries[i].response.split(",").map((result) => result.trim()),
};
}
});
const response = { data: formattedResults };
return new Response(JSON.stringify(response), {
headers: { "Content-Type": "application/json" },
});
},
};

If you wish to deploy this Worker, you can do so by running npx wrangler deploy:

Terminal window
Total Upload: <size_of_your_worker> KiB / gzip: <compressed_size_of_your_worker> KiB
Uploaded <name_of_your_worker> (x sec)
Deployed <name_of_your_worker> triggers (x sec)
https://<your_public_worker_endpoint>
Current Version ID: <worker_script_version_id>

This will create a public endpoint that you can use to access the Worker globally. Please keep this in mind when using production data, and make sure to include additional access controls in place.

Conclusion

In this tutorial, you have learnt how to integrate Google BigQuery and Cloudflare Workers by creating a GCP service account key and storing part of it as Worker secrets. This was later imported in the code, and by using the jose npm library, you created a JSON Web Token to authenticate the API query to BigQuery.

Once you obtained the results, you formatted them to later be passed to generative AI models via Workers AI to generate tags and to perform sentiment analysis on the extracted data.

Next Steps

If, instead of displaying the results of ingesting the data to the AI model in a browser, your workflow requires fetching and store data (for example in R2 or D1) on regular intervals, you may want to consider adding a scheduled handler for this Worker. It allows triggering the Worker with a predefined cadence via a Cron Trigger. Consider reviewing the Reference Architecture Diagrams on Ingesting BigQuery Data into Workers AI.

A use case to ingest data from other sources, like you did in this tutorial, is to create a RAG system. If this sounds relevant to you, please check out the tutorial Build a Retrieval Augmented Generation (RAG) AI.

To learn more about what other AI models you can use at Cloudflare, please visit the Workers AI section of our docs.