第 20 章 开发 PTP 事件消费者应用程序

在裸机集群节点上开发使用 Precision Time Protocol (PTP) 事件的消费者应用程序时,您需要在单独的应用程序 pod 中部署消费者应用程序和 cloud-event-proxy 容器。cloud-event-proxy 容器从 PTP Operator pod 接收事件,并将其传递给消费者应用程序。消费者应用使用 REST API 订阅 cloud-event-proxy 容器中发布的事件。

有关部署 PTP 事件通知的更多信息,请参阅关于 PTP 快速事件通知框架

注意

以下信息提供了开发使用 PTP 事件的消费者应用程序的一般指导。完整的事件消费者应用示例超出了此信息的范围。

20.1. PTP 事件消费者应用程序参考

PTP 事件消费者应用程序需要以下功能:

  1. 使用 POST 处理程序运行的 Web 服务,以接收云原生 PTP 事件 JSON 有效负载
  2. 订阅 PTP 事件制作者的 createSubscription 功能
  3. getCurrentState 功能轮询 PTP 事件制作者的当前状态

以下示例 Go 片断演示了这些要求:

Go 中的 PTP 事件消费者服务器功能示例

func server() {
  http.HandleFunc("/event", getEvent)
  http.ListenAndServe("localhost:8989", nil)
}

func getEvent(w http.ResponseWriter, req *http.Request) {
  defer req.Body.Close()
  bodyBytes, err := io.ReadAll(req.Body)
  if err != nil {
    log.Errorf("error reading event %v", err)
  }
  e := string(bodyBytes)
  if e != "" {
    processEvent(bodyBytes)
    log.Infof("received event %s", string(bodyBytes))
  } else {
    w.WriteHeader(http.StatusNoContent)
  }
}

Go 中的 PTP 事件 createSubscription 功能示例

import (
"github.com/redhat-cne/sdk-go/pkg/pubsub"
"github.com/redhat-cne/sdk-go/pkg/types"
v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
)

// Subscribe to PTP events using REST API
s1,_:=createsubscription("/cluster/node/<node_name>/sync/sync-status/os-clock-sync-state") 1
s2,_:=createsubscription("/cluster/node/<node_name>/sync/ptp-status/ptp-clock-class-change")
s3,_:=createsubscription("/cluster/node/<node_name>/sync/ptp-status/lock-state")

// Create PTP event subscriptions POST
func createSubscription(resourceAddress string) (sub pubsub.PubSub, err error) {
  var status int
      apiPath:= "/api/ocloudNotifications/v1/"
      localAPIAddr:=localhost:8989 // vDU service API address
      apiAddr:= "localhost:8089" // event framework API address

  subURL := &types.URI{URL: url.URL{Scheme: "http",
    Host: apiAddr
    Path: fmt.Sprintf("%s%s", apiPath, "subscriptions")}}
  endpointURL := &types.URI{URL: url.URL{Scheme: "http",
    Host: localAPIAddr,
    Path: "event"}}

  sub = v1pubsub.NewPubSub(endpointURL, resourceAddress)
  var subB []byte

  if subB, err = json.Marshal(&sub); err == nil {
    rc := restclient.New()
    if status, subB = rc.PostWithReturn(subURL, subB); status != http.StatusCreated {
      err = fmt.Errorf("error in subscription creation api at %s, returned status %d", subURL, status)
    } else {
      err = json.Unmarshal(subB, &sub)
    }
  } else {
    err = fmt.Errorf("failed to marshal subscription for %s", resourceAddress)
  }
  return
}

1
<node_name> 替换为正在生成 PTP 事件的节点 FQDN。例如,compute-1.example.com

Go 中的 PTP 事件消费者 getCurrentState 功能示例

//Get PTP event state for the resource
func getCurrentState(resource string) {
  //Create publisher
  url := &types.URI{URL: url.URL{Scheme: "http",
    Host: localhost:8989,
    Path: fmt.SPrintf("/api/ocloudNotifications/v1/%s/CurrentState",resource}}
  rc := restclient.New()
  status, event := rc.Get(url)
  if status != http.StatusOK {
    log.Errorf("CurrentState:error %d from url %s, %s", status, url.String(), event)
  } else {
    log.Debugf("Got CurrentState: %s ", event)
  }
}