2.8. 创建功能,将过滤的数据发布到您的存储桶

创建一个过滤数据的功能,并将其添加到您创建的存储帐户中,以便与红帽共享。您可以使用示例 Python 脚本从与红帽费用相关的成本导出中收集成本数据,并将其添加到存储帐户中。此脚本会过滤您使用 BigQuery 创建的成本数据,删除非红帽信息,然后创建 .csv 文件,将其存储在您创建的存储桶中,并将数据发送到红帽。

流程

  1. Google Cloud Console 中,搜索 secret 并选择 Secret Manager 结果来设置一个 secret,以使用 Red Hat 验证您的功能,而无需将凭证存储在您的功能中。

    1. 在 Secret Manager 页面中,点 Create Secret
    2. 为您的 secret 命名,添加您的红帽用户名,然后点 Create Secret
    3. 重复此过程,为您的红帽密码保存 secret。
  2. 在 Google Cloud Console 搜索栏中,搜索功能并选择 Cloud Functions 结果。
  3. Cloud Functions 页面上,单击 Create function
  4. 将函数命名为。在本例中,使用 customer-data-function
  5. Trigger 部分,点 Save 接受 HTTP Trigger 类型。
  6. Runtime, build, connections and security settings 中,单击 Security and image repository,引用您创建的 secret,点 Done,然后点 Next
  7. Cloud Functions Code 页面中,将运行时设置为 Python 3.9
  8. 打开 requirements.txt 文件。将下面几行粘贴到文件末尾。

    requests
    google-cloud-bigquery
    google-cloud-storage
  9. 打开 main.py 文件。

    1. Entry Point 设置为 get_filtered_data
    2. 粘贴以下 python 脚本。将标记为 # Required vars 的部分中 的值更改为您的环境的值。

      import csv
      import datetime
      import uuid
      import os
      import requests
      from google.cloud import bigquery
      from google.cloud import storage
      from itertools import islice
      from dateutil.relativedelta import relativedelta
      
      query_range = 5
      now = datetime.datetime.now()
      delta = now - relativedelta(days=query_range)
      year = now.strftime("%Y")
      month = now.strftime("%m")
      day = now.strftime("%d")
      report_prefix=f"{year}/{month}/{day}/{uuid.uuid4()}"
      
      # Required vars to update
      USER = os.getenv('username')         # Cost management username
      PASS = os.getenv('password')         # Cost management password
      INTEGRATION_ID = "<integration_id>"  # Cost management integration_id
      BUCKET = "<bucket>"                  # Filtered data GCP Bucket
      PROJECT_ID = "<project_id>"          # Your project ID
      DATASET = "<dataset>"                # Your dataset name
      TABLE_ID = "<table_id>"              # Your table ID
      
      gcp_big_query_columns = [
          "billing_account_id",
          "service.id",
          "service.description",
          "sku.id",
          "sku.description",
          "usage_start_time",
          "usage_end_time",
          "project.id",
          "project.name",
          "project.labels",
          "project.ancestry_numbers",
          "labels",
          "system_labels",
          "location.location",
          "location.country",
          "location.region",
          "location.zone",
          "export_time",
          "cost",
          "currency",
          "currency_conversion_rate",
          "usage.amount",
          "usage.unit",
          "usage.amount_in_pricing_units",
          "usage.pricing_unit",
          "credits",
          "invoice.month",
          "cost_type",
          "resource.name",
          "resource.global_name",
      ]
      table_name = ".".join([PROJECT_ID, DATASET, TABLE_ID])
      
      BATCH_SIZE = 200000
      
      def batch(iterable, n):
          """Yields successive n-sized chunks from iterable"""
          it = iter(iterable)
          while chunk := tuple(islice(it, n)):
              yield chunk
      
      def build_query_select_statement():
          """Helper to build query select statement."""
          columns_list = gcp_big_query_columns.copy()
          columns_list = [
              f"TO_JSON_STRING({col})" if col in ("labels", "system_labels", "project.labels") else col
              for col in columns_list
          ]
          columns_list.append("DATE(_PARTITIONTIME) as partition_date")
          return ",".join(columns_list)
      
      def create_reports(query_date):
          query = f"SELECT {build_query_select_statement()} FROM {table_name} WHERE DATE(_PARTITIONTIME) = {query_date} AND sku.description LIKE '%RedHat%' OR sku.description LIKE '%Red Hat%' OR  service.description LIKE '%Red Hat%' ORDER BY usage_start_time"
          client = bigquery.Client()
          query_job = client.query(query).result()
          column_list = gcp_big_query_columns.copy()
          column_list.append("partition_date")
          daily_files = []
          storage_client = storage.Client()
          bucket = storage_client.bucket(BUCKET)
          for i, rows in enumerate(batch(query_job, BATCH_SIZE)):
              csv_file = f"{report_prefix}/{query_date}_part_{str(i)}.csv"
              daily_files.append(csv_file)
              blob = bucket.blob(csv_file)
              with blob.open(mode='w') as f:
                  writer = csv.writer(f)
                  writer.writerow(column_list)
                  writer.writerows(rows)
          return daily_files
      
      def post_data(files_list):
          # Post CSV's to console.redhat.com API
          url = "https://console.redhat.com/api/cost-management/v1/ingress/reports/"
          json_data = {"source": INTEGRATION_ID, "reports_list": files_list, "bill_year": year, "bill_month": month}
          resp = requests.post(url, json=json_data, auth=(USER, PASS))
          return resp
      
      def get_filtered_data(request):
          files_list = []
          query_dates = [delta + datetime.timedelta(days=x) for x in range(query_range)]
          for query_date in query_dates:
              files_list += create_reports(query_date.date())
          resp = post_data(files_list)
          return f'Files posted! {resp}'
  10. Deploy