在 BigQuery 中查询报告日志的示例

Cloud Identity 专业版提供此功能。版本对比

您可以在 BigQuery 中查询常见报告,本文列出了一些查询示例。这些查询示例假设您使用旧版 SQL。将 api_project_name.dataset_name 替换为您自己的项目名称和数据集名称。

详细了解 BigQuery 数据查询

如需了解 Gmail 日志字段及其含义,请参阅 BigQuery 中的 Gmail 活动日志架构

查询示例

账号

管理员账号和委托账号的数量,以及已停用、锁定和暂停的账号数量(按日期显示)

SELECT date,
accounts.num_locked_users,
accounts.num_disabled_accounts,
accounts.num_delegated_admins,
accounts.num_super_admins,
accounts.num_suspended_users,
accounts.num_users
FROM api_project_name.dataset_name.usage
WHERE accounts.num_users IS NOT NULL
ORDER BY date ASC;

管理员

管理员执行最频繁的活动

SELECT count(*) as admin_actions, event_name
FROM api_project_name.dataset_name.activity
WHERE email IN (
  SELECT user_email
  FROM api_project_name.dataset_name.usage
  WHERE accounts.is_super_admin = TRUE
)
GROUP BY 2
ORDER BY 1 DESC;

查询特定网域中的超级用户数量

SELECT COUNT(DISTINCT user_email) as number_of_super_admins, date
FROM api_project_name.dataset_name.usage
WHERE accounts.is_super_admin = TRUE
GROUP BY 2
ORDER BY 2 DESC;

日历

仅限标准 SQL

Google 日历中每日活跃用户占 30 天内活跃用户的比例。以下是在多个表中进行查询的示例。

每日活跃用户数量

SELECT date, calendar.num_1day_active_users
FROM api_project_name.dataset_name.usage
WHERE calendar.num_1day_active_users IS NOT NULL
ORDER BY date DESC

过去 30 天活跃用户数量

SELECT date, calendar.num_30day_active_users
FROM api_project_name.dataset_name.usage
WHERE calendar.num_30day_active_users IS NOT NULL
ORDER BY date DESC;

日历活动数量(按类型显示)

SELECT COUNT(DISTINCT calendar.calendar_id) AS count, event_name
FROM api_project_name.dataset_name.activity
WHERE calendar.calendar_id IS NOT NULL
GROUP BY 2 ORDER BY 1 DESC;

云端硬盘

共享的 Google 云端硬盘内容数量(按共享方式分组)

SELECT COUNT(DISTINCT drive.doc_id) AS count, drive.visibility
FROM api_project_name.dataset_name.activity
WHERE drive.doc_id IS NOT NULL
GROUP BY 2 ORDER BY 1 DESC;

文件 ID、标题、所有者和类型。在特定时间范围内对外共享的文件。

SELECT TIMESTAMP_MICROS(time_usec) AS date, drive.doc_id, drive.doc_title,
drive.owner, drive.doc_type
FROM api_project_name.dataset_name.activity
WHERE drive.visibility = "shared_externally"
ORDER BY 1 DESC
LIMIT 100;

共享权限变更及其结果。可让您了解哪些权限变更造成了文件公开范围的变化。

SELECT TIMESTAMP_MICROS(time_usec) AS date, drive.doc_title,
drive.visibility_change,drive.old_visibility, drive.visibility,
FROM api_project_name.dataset_name.activity
WHERE record_type = "drive"
AND drive.old_visibility IS NOT NULL
AND drive.old_visibility != "unknown";

按文件类型细分的事件类型。适用于按文件类型分类的采用度报告。

SELECT drive.doc_type, event_type, count(*) 
FROM  api_project_name.dataset_name.activity
WHERE record_type = "DRIVE"
GROUP by 1,2 ORDER BY 3 desc;

事件类型以及各个共享云端硬盘的名称

SELECT drive.shared_drive_id, event_type, event_name, record_type,
count(distinct drive.doc_id) AS count
FROM api_project_name.dataset_name.activity
WHERE record_type = "drive"
AND drive.shared_drive_id IS NOT NULL
GROUP BY 1,2,3,4 ORDER BY 5 DESC;

网域外部用户的相关信息

SELECT email, event_name, count(*) AS count
FROM api_project_name.dataset_name.activity
WHERE email != ""
AND email NOT LIKE "%mydomain.com%"
GROUP BY 1,2 ORDER BY 3 DESC;

授予外部用户的权限发生了何种变更以及变更时间

SELECT drive.target_user, event_name, count(*) AS count
FROM api_project_name.dataset_name.activity
WHERE drive.target_user IS NOT NULL
AND drive.target_user NOT LIKE "%mydomain.com%"
GROUP BY 1,2 ORDER BY 3 DESC;

关于存储空间的监控信息

适用于在设定了云端硬盘存储空间阈值(使用 AND accounts.drive_used_quota_in_mb > 0 子句定义)的情况下,针对云端硬盘存储空间用量超过 X 的用户生成报告。

此查询可定义为计划查询,或者举例来说,可以使用 API 定期调用。

SELECT date,
user_email,
accounts.drive_used_quota_in_mb,
FROM api_project_name.dataset_name.usage
WHERE accounts.drive_used_quota_in_mb IS NOT NULL
AND accounts.drive_used_quota_in_mb > 0
AND user_email != ""
AND date = CAST(DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) AS STRING)
ORDER BY 3,1 DESC;

注意:

  • 您可以根据客户设置的过滤条件修改此值。例如,如果存储空间用量超过 15GB,则可使用:AND accounts.drive_used_quota_in_mb > 15000
  • 使用日期比较语句 CAST(DATE_SUB(CURRENT_DATE(), INTERVAL x DAY) AS STRING),就能以日期值支持的格式进行日期比较。
  • 此查询也适用于 Gmail,我们可以在其中找到类似的值:accounts.gmail_used_quota_in_mb

Gmail

在 BigQuery 中查询 Gmail 数据的最佳做法

  • 仅查询您需要的数据。以下示例设置的匹配记录上限为 1000 条,但您可以自行设置上限。
  • 为您的查询设置时间范围。时间范围通常设置为一天。

与主题匹配
创建一个显示邮件摘要的视图,用于查询与指定主题相符的记录,但最多不超过 1000 条

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address,
     gmail.message_info.rfc2822_message_id
FROM your_dataset_id.activity
WHERE gmail.message_info.subject LIKE "%test%"
LIMIT 1000

与收件人匹配
统计指定收件人的非重复邮件数量

SELECT COUNT(DISTINCT gmail.message_info.rfc2822_message_id)
FROM your_dataset_id.activity d
WHERE
  EXISTS(
   SELECT 1 FROM d.gmail.message_info.destination WHERE destination.address = "recipient@example.com")

与处理方式和收件人匹配

创建一个显示邮件摘要的视图,用于查询与以下两项条件均相符的记录,但最多不超过 1000 条:

  • 指定的处理方式(修改、拒绝、隔离)
  • 指定的收件人

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE
     destination.address = "recipient@example.com" AND
     EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence
          WHERE consequence.action = 17)
LIMIT 1000

触发的规则说明
创建一个显示邮件摘要的视图,用于查询触发了指定规则说明的记录,但最多不超过 1000 条

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE
    EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence
       WHERE consequence.reason LIKE '%description%')
LIMIT 1000

已标为垃圾邮件
创建一个显示邮件摘要的视图,用于查询与以下条件相符的记录,但最多不超过 1000 条:

  • 已标为垃圾邮件
  • 发给指定的收件人
  • 因为各种原因被标为垃圾内容的邮件

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.is_spam AND
          destination.address = "recipient@example.com"
LIMIT 1000

加密协议 - 未加密
创建一个显示邮件摘要的视图,用于查询加密协议为“未加密”的记录:

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
gmail.message_info.subject,
gmail.message_info.source.address as source,
destination.address as destination,
gmail.message_info.rfc2822_message_id
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.connection_info.smtp_tls_state = 0
LIMIT 1000

加密协议 - 仅采用 TLS
创建一个显示邮件摘要的视图,用于查询加密协议为“仅采用 TLS”的记录

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.connection_info.smtp_tls_state = 1
LIMIT 1000

与邮件 ID 匹配
创建一个显示邮件详细信息的视图,用于根据指定的邮件 ID 来查询邮件(在邮件 ID 两边需加上“<>”)

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
gmail.event_info.success,
gmail.event_info.elapsed_time_usec,
gmail.message_info.subject,
gmail.message_info.source.address as source,
gmail.message_info.source.service as source_service,
gmail.message_info.source.selector as source_selector,
destination.address as destination,
destination.service,
destination.selector as destination_selector,
gmail.message_info.rfc2822_message_id,
gmail.message_info.payload_size,
gmail.message_info.num_message_attachments,
gmail.message_info.connection_info.smtp_tls_state,
gmail.message_info.description
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.rfc2822_message_id = ""
LIMIT 1000

处理方式 - 拒绝邮件
拒绝邮件:

  • 是依据什么规则拒绝的?

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id,
     (SELECT ARRAY_AGG(consequence.reason)
     FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence)
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.rfc2822_message_id = "<message id>" AND
     EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence
        WHERE consequence.action = 17)
LIMIT 1000

处理方式 - 修改邮件
修改邮件:

  • 是依据什么规则修改的?
  • 具体修改的是哪个子类别(例如标头或主题)?

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id,
     (SELECT ARRAY_AGG((consequence.action, consequence.reason))
     FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence)
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.rfc2822_message_id = "<message id>" AND
   EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence
       WHERE consequence.action NOT IN (0, 17, 3))
LIMIT 1000

隔离邮件
是依据什么规则隔离邮件的?

SELECT TIMESTAMP_MICROS(gmail.event_info.timestamp_usec) as timestamp,
     gmail.message_info.subject,
     gmail.message_info.source.address as source,
     destination.address as destination,
     gmail.message_info.rfc2822_message_id,
     (SELECT ARRAY_AGG(consequence.reason)
     FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence)
FROM your_dataset_id.activity d, d.gmail.message_info.destination
WHERE gmail.message_info.rfc2822_message_id = "<message id>" AND
     EXISTS(SELECT 1 FROM d.gmail.message_info.triggered_rule_info ri, ri.consequence
        WHERE consequence.action = 3)
LIMIT 1000

复合查询
统计在过去 30 天内符合特定规则(即所谓的“规则说明”)的所有邮件总数:

SELECT
  COUNT(gmail.message_info.rfc2822_message_id) AS message_cnt
FROM
  `your_dataset_id.activity`,
UNNEST (gmail.message_info.triggered_rule_info) AS triggered_rule
WHERE
  _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
  AND triggered_rule.rule_name LIKE "rule description"

列出最近 1 天内收到的未经 TLS 加密的所有邮件:

SELECT gmail.message_info.subject,
    gmail.message_info.rfc2822_message_id
FROM `your_dataset_id.activity`
WHERE
    _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY) AND
    gmail.message_info.connection_info.smtp_tls_state = 0

列出过去 30 天内与我的账号往来邮件最频繁的 10 个网域:

SELECT
  COUNT(DISTINCT gmail.message_info.rfc2822_message_id) as message_cnt,
  IF(gmail.message_info.is_policy_check_for_sender, 
       REGEXP_EXTRACT(gmail.message_info.source.address , "(@.*)"),
       REGEXP_EXTRACT(destination.address , "(@.*)")) AS domain
FROM `your_dataset_id.activity` d, d.gmail.message_info.destination
WHERE 
    _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY domain
ORDER BY message_cnt desc
LIMIT 10

Gmail 的每日活跃用户数量占 30 天内活跃用户数量的比例

每日活跃用户数量:

SELECT date,
gmail.num_1day_active_users
FROM api_project_name.dataset_name.usage
WHERE gmail.num_1day_active_users > 0
ORDER BY 1 DESC;

过去 7 天内的活跃用户数量:

SELECT date,
gmail.num_7day_active_users
FROM api_project_name.dataset_name.usage
WHERE gmail.num_7day_active_users > 0
ORDER BY 1 DESC;

过去 30 天内的活跃用户数量:

SELECT date,
gmail.num_30day_active_users
FROM api_project_name.dataset_name.usage
WHERE gmail.num_30day_active_users > 0
ORDER BY 1 DESC;

与电子邮件关联的至少 1 个分类标签的最近 100 个 Gmail 日志事件

SELECT
  resource_details[OFFSET(0)].id AS MESSAGE_ID,
  gmail.message_info.subject AS SUBJECT,
  gmail.event_info.mail_event_type AS MAIL_EVENT_TYPE,
  gmail.message_info.source.address AS SENDER,
  resource_details[OFFSET(0)].applied_labels AS LABELS
FROM workspace_audit_logs.activity
WHERE gmail.event_info.mail_event_type > 0 and ARRAY_LENGTH(resource_details) > 0
ORDER by time_usec desc
LIMIT 100;

特定电子邮件的所有可用日志事件

SELECT
  gmail.event_info,
  gmail.message_info,
  resource_details
FROM workspace_audit_logs.activity
WHERE gmail.message_info.rfc2822_message_id = "<XYZ>"
ORDER by time_usec desc;

群组

Google 群组成员资格变更和用户行为

SELECT TIMESTAMP_MICROS(time_usec) AS date,
  event_name,
  admin.group_email,
  event_type,
  email,
  record_type,
  admin.user_email,
  admin.new_value,
  admin.old_value,
  admin.setting_name
FROM project_name.dataset_name.activity
WHERE `admin`.group_email IS NOT NULL
AND CONCAT(TIMESTAMP_MICROS(time_usec)) LIKE "%YYYY-MM-DD%"
ORDER BY 1 DESC
LIMIT
  1000

如果您希望采用 YYYY-MM-DD 格式的时间戳,则可将第一个 SELECT 语句元素替换为以下代码:
EXTRACT(DATE FROM TIMESTAMP_MICROS(time_usec)) AS date,

您可以通过以下两种方式之一使用 WHERE 子句过滤日期:

SELECT TIMESTAMP_MICROS(time_usec) AS date,
  event_name,
  admin.group_email,
  event_type,
  email,
  record_type,
  admin.user_email,
  admin.new_value,
  admin.old_value,
  admin.setting_name
FROM project_name.dataset_name.activity
WHERE `admin`.group_email IS NOT NULL
AND EXTRACT(DATE FROM TIMESTAMP_MICROS(time_usec)) > "2020-06-30"
AND EXTRACT(DATE FROM TIMESTAMP_MICROS(time_usec)) < "2020-08-31"
ORDER BY 1 DESC
LIMIT
  1000

SELECT TIMESTAMP_MICROS(time_usec) AS date,
  event_name,
  admin.group_email,
  event_type,
  email,
  record_type,
  admin.user_email,
  admin.new_value,
  admin.old_value,
  admin.setting_name
FROM project_name.dataset_name.activity
WHERE `admin`.group_email IS NOT NULL
AND TIMESTAMP_MICROS(time_usec) > TIMESTAMP("2020-07-21")
AND TIMESTAMP_MICROS(time_usec) < TIMESTAMP("2020-07-23")
ORDER BY 1 DESC
LIMIT
  1000

Google Meet

视频通话数量和总通话分钟数(按日期显示)

SELECT date, meet.num_calls, meet.total_call_minutes
FROM `api_project_name.dataset_name.usage`
WHERE meet.num_calls IS NOT NULL
ORDER BY date ASC

每日活跃用户数量

SELECT date, meet.num_1day_active_users
FROM `api_project_name.dataset_name.usage`
WHERE meet.num_1day_active_users IS NOT NULL
ORDER BY date DESC

过去 30 天活跃用户数量

SELECT date, meet.num_30day_active_users
FROM `api_project_name.dataset_name.usage`
WHERE meet.num_30day_active_users IS NOT NULL
ORDER BY date DESC

规则

触发的数据泄露防护规则(按名称、匹配的应用和操作显示)

SELECT TIMESTAMP_MICROS(time_usec) AS date, rules.rule_name, rules.application,
rules.resource_title, rules.actions, rules.resource_owner_email,
rules.data_source, rules.matched_trigger
FROM api_project_name.dataset_name.activity
WHERE rules.rule_name IS NOT NULL
ORDER BY 1 DESC LIMIT 1000;

令牌

授权第三方应用访问 Google 云端硬盘的次数

SELECT token.client_id, scope, token.app_name, count(*) AS count
FROM api_project_name.dataset_name.activity
LEFT JOIN UNNEST(token.scope) AS scope
WHERE scope LIKE "%drive%"
GROUP BY 1,2,3 ORDER BY 4 DESC;

尝试登录管理控制台的事件

Google 管理控制台登录失败事件的详细信息

SELECT TIMESTAMP_MICROS(time_usec) AS date, email, ip_address,
event_name, login.login_type, login.login_failure_type
FROM api_project_name.dataset_name.activity
WHERE login.login_type IS NOT NULL
AND login.login_failure_type IS NOT NULL
AND event_type = "login"
ORDER BY date DESC;

使用情况表的架构

表架构会变化。您可以在报告 API 参考文档中查看参数和字段的最新完整列表。

按日期过滤

在对活动情况表或使用情况表执行查询时,您可以按日期过滤结果。这两种表采用不同的日期显示格式:

  • 活动情况表以 Unix 微秒数为单位存储时间戳。该时间戳是一个整数值(数字),可使用 TIMESTAMP_MICROS() 函数转换为日期。
  • 使用情况表以日期格式显示 date 值,因此不需要进行转换。

对于以上任一表格,您都可以使用以下方法来按特定日期(或日期范围)进行过滤。

活动情况表

如果要以 Unix 微秒数格式(活动情况表)按特定日期进行过滤,您可以定义 WHERE 子句和 TIMESTAMP() 函数,使用大于 (>) 和小于 (<) 运算符执行简单的比较:

SELECT TIMESTAMP_MICROS(time_usec) as date, record_type
FROM api_project_name.dataset_name.activity
WHERE TIMESTAMP_MICROS(time_usec) > TIMESTAMP("2020-07-01")
AND TIMESTAMP_MICROS(time_usec) < TIMESTAMP("2020-07-07")
ORDER BY 1 DESC LIMIT 1000

此处的思路是将函数 TIMESTAMP_MICROS() 的返回值与函数 TIMESTAMP()(在其中添加字符串类型的日期参数)的返回值进行比较,从而为输入值 time_usec 设定限制。这样就可以遵循标准 SQL 中的时间戳函数的标准,采用简单的比较运算符 (>) 和 (<),同时利用 AND 扩展 WHERE 子句,从而限定时间范围。

使用情况表

SELECT date, meet.num_calls,
FROM api_project_name.dataset_name.usage
WHERE meet.num_calls IS NOT NULL
AND TIMESTAMP(date) > TIMESTAMP("2020-07-01")
AND TIMESTAMP(date) < TIMESTAMP("2020-07-07")
ORDER BY date DESC;

我们可以将表中显示的字符串类型 date 值传递到函数 TIMESTAMP() 中,并且像第一个示例那样使用比较运算符 (>) 和 (<)。

 

按网域别名和子域名过滤:排除和包含

要在查询结果中包含(或排除)特定域名,可以在 WHERE 子句中添加电子邮件地址过滤条件,然后使用通配符 (%) 来过滤域名。

如何使用 ANDOR 语句取决于您是要滤除(排除)还是要仅包含特定结果

从结果中排除特域名

WHERE email NOT LIKE ("%@sub.%")
AND email NOT LIKE ("%@test.%")

仅在结果中包含特定域名

WHERE email LIKE ("%@sub.%")
OR email LIKE ("%@test.%")

规则审核日志

此查询可用于跟踪用户尝试共享敏感数据的事件

SELECT TIMESTAMP_MICROS(time_usec) AS Date,
rules.resource_owner_email AS User,
rules.rule_name AS ruleName,
rules.rule_type AS ruleType,
rules.rule_resource_name AS ruleResourceName,
rules.resource_id AS resourceId,
rules.resource_title AS resourceTitle,
rules.resource_type AS resourceType,
rules.resource_owner_email AS resourceOwner,
CAST(recipients AS STRING) AS Recipients,
rules.data_source AS dataSource,
rules.actor_ip_address AS actorIpAddress,
rules.severity AS severity,
rules.scan_type AS scanType,
rules.matched_trigger AS matchedTriggers,
detect.display_name AS matchedDetectorsName,
detect.detector_id AS matchedDetectorsId,
detect.detector_type AS matchedDetectorsType,
triggers.action_type AS triggeredActions,
suppressors.action_type AS suppressedActions,
FROM api_project_name.dataset_name.activity
LEFT JOIN UNNEST(rules.resource_recipients) as recipients
LEFT JOIN UNNEST(rules.matched_detectors) as detect
LEFT JOIN UNNEST(rules.triggered_actions) as triggers
LEFT JOIN UNNEST(rules.suppressed_actions) as suppressors
WHERE rules.rule_name IS NOT NULL
AND triggers.action_type != "ALERT"
ORDER BY 1 DESC
LIMIT 1000;

相关主题

该内容对您有帮助吗?

您有什么改进建议?
搜索
清除搜索内容
关闭搜索框
主菜单
9276127914906723816
true
搜索支持中心
true
true
true
false
false