SyncGatewayの_changesでドキュメントの変更通知を受信
10 Apr 2015 | CouchbaseSyncGatewayの_changes APIを利用して、変更されたドキュメントを継続的に取得するcURLサンプル。Sync Gatewayのソースコードを読みながら、指定できるオプションなどを紹介します。
用途としては、Couchbase Liteなどから登録されたドキュメントをストリームで受信し、リアルタイムな分析を行うストリーム処理系に渡すなどがあります。
ドキュメントの登録
まずは、ドキュメントの登録方法から見てみましょう。 データベース名のURLに保存するJSONをPOSTするだけです、簡単ですね。
curl -H "Content-Type: application/json" http://localhost:4984/kitchen-sync/ \
-d '{"source": "cURL", "val": 1}'
HTTP/1.1 200 OK
Content-Length: 94
Content-Type: application/json
Etag: 1-19ae5baa29d8b81534fb86de3407f482
Location: e13b68fe5f561a78ece06bf7be322c36
Server: Couchbase Sync Gateway/1.00
Date: Fri, 10 Apr 2015 03:44:21 GMT
{"id":"e13b68fe5f561a78ece06bf7be322c36","ok":true,"rev":"1-19ae5baa29d8b81534fb86de3407f482"}
_changes API
GET \/{db}\/_changes
このエンドポイントで変更通知を受信することができます。 何もパラメータを指定しないと、データベース開始時点からの変更が全て返却されます。 ただし、同じドキュメントを複数回更新している場合は、最新のリビジョンのみを返します。
curl -H "Content-Type: application/json" http://localhost:4984/kitchen-sync/_changes
クエリパラメータ
_changesにはいくつかのパラメータを指定できます:
- include_docs: デフォルトではドキュメントidとrevのみが返される
- limit: 件数を絞る
- descending: 順序を逆にする、最新のn件を取得したい場合に <– 使いたかったのだけど…
- … etc
基本的にはCouchbaseLiteのドキュメントに記載されているのと同じですが、SyncGatewayでは一部実装されていないものもあるようですね。
SyncGatewayのGo実装を見てみよう
SyncGatewayで指定可能なパラメータはこちらを参照すると良いでしょう。 sync_gateway/db/changes.go
// Options for changes-feeds
type ChangesOptions struct {
Since SequenceID // sequence # to start _after_
Limit int // Max number of changes to return, if nonzero
Conflicts bool // Show all conflicting revision IDs, not just winning one?
IncludeDocs bool // Include doc body of each change?
Wait bool // Wait for results, instead of immediately returning empty result?
Continuous bool // Run continuously until terminated?
Terminator chan bool // Caller can close this channel to terminate the feed
HeartbeatMs uint64 // How often to send a heartbeat to the client
TimeoutMs uint64 // After this amount of time, close the longpoll connection
}
Descendingは見当たらないですね。 しかし、SinceとConitnuousを組み合わせると、ある時点から継続的に変更を受信することができます。
ログを見ると、Continuousがtrueになっているリクエストがありました。Couchbase LiteからはWebSocketを利用して継続的に変更を取得しているようです。
13:14:30.598891 HTTP: #002: GET /kitchen-sync/_changes?feed=websocket
13:14:30.599522 HTTP+: #002: --> 101 Upgraded to WebSocket protocol (0.0 ms)
13:14:30.600480 Changes: MultiChangesFeed({*}, {Since:212 Limit:0 Conflicts:true IncludeDocs:false Wait:true Continuous:true Terminator:0xc210054060}) ...
HTTPリクエストで設定したパラメータを解析して、前述のChangesOptionsを生成しています。 パラメータに何を指定すれば良いのかは、restパッケージのhandleChanges()を見ると分かりそうですね。
sync_gateway/rest/changes_api.go
feedの指定は以下の通り:
switch feed {
case "normal", "":
return h.sendSimpleChanges(userChannels, options)
case "longpoll":
options.Wait = true
return h.sendSimpleChanges(userChannels, options)
case "continuous":
return h.sendContinuousChangesByHTTP(userChannels, options)
case "websocket":
return h.sendContinuousChangesByWebSocket(userChannels, options)
default:
return base.HTTPErrorf(http.StatusBadRequest, "Unknown feed type")
}
cURLで実行してみよう
コンソールを二つ立ち上げます。 一つ目のコンソールで変更をリスンしつつ、
// feedオプションを指定
curl -H "Content-Type: application/json" \
"http://localhost:4984/kitchen-sync/_changes?feed=continuous&include_docs=true"
もう一つのコンソールから、複数のドキュメントを登録してみましょう。
for i in `seq 1 3`;
do
curl -H "Content-Type: application/json" http://localhost:4984/kitchen-sync/ \
-d "{\"source\": \"cURL\", \"val\": $i}";
done
一つ目のコンソールで、変更されたドキュメントが表示されました!
{"seq":236,"id":"695d59cc3bacf246fbf4e945fcb038b6","doc":{"_id":"695d59cc3bacf246fbf4e945fcb038b6","_rev":"1-81243e5196b32067c9605cea71047dee","source]":"cURL","val":1},"changes":[{"rev":"1-81243e5196b32067c9605cea71047dee"}]}
{"seq":237,"id":"b1a9942769ad7da2acbc7dee8fe5dc84","doc":{"_id":"b1a9942769ad7da2acbc7dee8fe5dc84","_rev":"1-f3d312e52dc49359967a4eae6dab7303","source]":"cURL","val":2},"changes":[{"rev":"1-f3d312e52dc49359967a4eae6dab7303"}]}
{"seq":238,"id":"8bfea699dcd19efd246fc3a3f43f30e5","doc":{"_id":"8bfea699dcd19efd246fc3a3f43f30e5","_rev":"1-116e5606156a893fa452537aa007f529","source]":"cURL","val":3},"changes":[{"rev":"1-116e5606156a893fa452537aa007f529"}]}