第20章 PTP イベントコンシューマーアプリケーションの開発

ベアメタルクラスターノードで Precision Time Protocol (PTP)イベントを使用するコンシューマーアプリケーションを開発する場合は、コンシューマーアプリケーションと cloud-event-proxy コンテナーを別のアプリケーション Pod にデプロイする必要があります。cloud-event-proxy コンテナーは、PTP Operator Pod からイベントを受信し、これをコンシューマーアプリケーションに渡します。コンシューマーアプリケーションは、REST API を使用して cloud-event-proxy コンテナーで投稿されたイベントにサブスクライブします。

PTP イベントアプリケーションのデプロイに関する詳細は、About the PTP fast event notifications framework を参照してください。

注記

以下の情報は、PTP イベントを使用するコンシューマーアプリケーションを開発するための一般的なガイダンスです。完全なイベントコンシューマーアプリケーションの例は、この情報の範囲外です。

20.1. PTP イベントコンシューマーアプリケーションのリファレンス

PTP イベントコンシューマーアプリケーションには次の機能が必要です。

  1. POST ハンドラーで実行され、クラウドネイティブ PTP イベントの JSON ペイロードを受信する Web サービス
  2. PTP イベントプロデューサーをサブスクライブするための createSubscription 関数
  3. PTP イベントプロデューサーの現在の状態をポーリングする getCurrentState 関数

次の Go スニペットの例は、これらの要件を示しています。

Go での PTP イベントコンシューマーサーバー関数の例

func server() {
  http.HandleFunc("/event", getEvent)
  http.ListenAndServe("localhost:8989", nil)
}

func getEvent(w http.ResponseWriter, req *http.Request) {
  defer req.Body.Close()
  bodyBytes, err := io.ReadAll(req.Body)
  if err != nil {
    log.Errorf("error reading event %v", err)
  }
  e := string(bodyBytes)
  if e != "" {
    processEvent(bodyBytes)
    log.Infof("received event %s", string(bodyBytes))
  } else {
    w.WriteHeader(http.StatusNoContent)
  }
}

PTP イベントの例 Go の createSubscription 関数

import (
"github.com/redhat-cne/sdk-go/pkg/pubsub"
"github.com/redhat-cne/sdk-go/pkg/types"
v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
)

// Subscribe to PTP events using REST API
s1,_:=createsubscription("/cluster/node/<node_name>/sync/sync-status/os-clock-sync-state") 1
s2,_:=createsubscription("/cluster/node/<node_name>/sync/ptp-status/ptp-clock-class-change")
s3,_:=createsubscription("/cluster/node/<node_name>/sync/ptp-status/lock-state")

// Create PTP event subscriptions POST
func createSubscription(resourceAddress string) (sub pubsub.PubSub, err error) {
  var status int
      apiPath:= "/api/ocloudNotifications/v1/"
      localAPIAddr:=localhost:8989 // vDU service API address
      apiAddr:= "localhost:8089" // event framework API address

  subURL := &types.URI{URL: url.URL{Scheme: "http",
    Host: apiAddr
    Path: fmt.Sprintf("%s%s", apiPath, "subscriptions")}}
  endpointURL := &types.URI{URL: url.URL{Scheme: "http",
    Host: localAPIAddr,
    Path: "event"}}

  sub = v1pubsub.NewPubSub(endpointURL, resourceAddress)
  var subB []byte

  if subB, err = json.Marshal(&sub); err == nil {
    rc := restclient.New()
    if status, subB = rc.PostWithReturn(subURL, subB); status != http.StatusCreated {
      err = fmt.Errorf("error in subscription creation api at %s, returned status %d", subURL, status)
    } else {
      err = json.Unmarshal(subB, &sub)
    }
  } else {
    err = fmt.Errorf("failed to marshal subscription for %s", resourceAddress)
  }
  return
}

1
<node_name> を、PTP イベントを生成しているノードの FQDN に置き換えます。compute-1.example.com はその例です。

Go の PTP イベントコンシューマー getCurrentState 関数の例

//Get PTP event state for the resource
func getCurrentState(resource string) {
  //Create publisher
  url := &types.URI{URL: url.URL{Scheme: "http",
    Host: localhost:8989,
    Path: fmt.SPrintf("/api/ocloudNotifications/v1/%s/CurrentState",resource}}
  rc := restclient.New()
  status, event := rc.Get(url)
  if status != http.StatusOK {
    log.Errorf("CurrentState:error %d from url %s, %s", status, url.String(), event)
  } else {
    log.Debugf("Got CurrentState: %s ", event)
  }
}