Red Hat Training
A Red Hat training course is available for Red Hat Fuse
第108章 MongoDB
Camel MongoDB コンポーネント
Camel 2.10 以降で利用可能
Wikipedia: "NoSQL is a moving promoting a loosely defined class of loosely defined class of non-elational databases and ACID guarantee." 過去数年、NoSQL ソリューションが人気が増し、Facebook、LinkedIn、Twitter などの主要な非常に使用されているサイトやサービスは、それらを広範囲に使用することが知られています。これにより、スケーラビリティーと調整性が広範囲に活用されています。
基本的に、NoSQL ソリューションは、SQL をクエリー言語として使用しておらず、通常は ACID と同様のトランザクション動作やリレーショナルデータベースを提供しないという点で、従来の NORMAL (Relational Database Management Systems)とは異なります。代わりに、柔軟なデータ構造とスキーマの概念を中心としています(つまり、固定されたスキーマを持つデータベーステーブルの概念が破棄され、コモディティーハードウェアおよびブラジング処理における非常に高いスケーラビリティーがあります)。
MongoDB は非常に人気の高い NoSQL ソリューションであり、camel-mongodb コンポーネントは Camel と MongoDB を統合するため、MongoDB コレクションをプロデューサー(コレクションでの操作)およびコンシューマー(MongoDB コレクションからのドキュメント)として対話できます。
MongoDB はドキュメントの概念に関するものです(オフィスのドキュメントではなく、JSON/BSON で定義された階層データ)。このコンポーネントページは、それらに精通していることを前提としています。それ以外の場合は、http://www.mongodb.org/ にアクセスします。
Maven ユーザーは、このコンポーネントの
pom.xml
に以下の依存関係を追加する必要があります。
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-mongodb</artifactId> <version>2.17.0.redhat-630xxx</version> <!-- use the same version as your Camel core version --> </dependency>
URI 形式
mongodb:connectionBean?database=databaseName&collection=collectionName&operation=operationName[&moreOptions...]
エンドポイントオプション
MongoDB エンドポイントは、以下のオプションをサポートします。これらのオプションは、Producer または Consumer のどちらとして動作するかによって異なります(オプションはコンシューマータイプによって変わります)。
名前 | デフォルト値 | 説明 | プロデューサー | Tailable Cursor コンシューマー |
---|---|---|---|---|
database
|
none |
必須。このエンドポイントがバインドされるデータベースの名前。動的性が有効で、 CamelMongoDbDatabase ヘッダーが設定されていない限り、すべての操作はこのデータベースに対して実行されます。
|
(/) | (/) |
コレクション
|
none |
必須(getDbStats およびコマンド操作の検証)。 このエンドポイントがバインドされるコレクションの名前(指定されたデータベース内)。動的性が有効で、 CamelMongoDbDatabase ヘッダーが設定されていない限り、ll 操作はこのデータベースに対して実行されます。
|
(/) | (/) |
collectionIndex
|
none | Camel 2.12: 新しいコレクションを挿入する際に作成するオプションの 単一フィールド インデックス または複合インデックス。 | (/) | |
operation
|
none |
プロデューサーに必要です。このエンドポイントが実行する操作の ID。以下を選択します。
|
(/) | |
createCollection
|
true |
エンドポイントの初期中にコレクションが存在しない場合に、そのコレクションを MongoDB データベースに自動作成するかどうかを決定します。このオプションが false でコレクションが存在しない場合は、初期例外が出力されます。
|
(/) | |
writeConcern
|
none (ドライバーのデフォルト) |
MongoDB のパラメーター化された値から操作に WriteConcern を設定します。WriteConcern.valueOf (String) を参照してください。
|
(/) | |
writeConcernRef
|
none |
レジストリーに存在するカスタム WriteConcern を設定します。Bean 名を指定します。
|
(/) | |
readPreference
|
none |
Camel 2.12.4、2.13.1、および 2.14.0 で利用可能:接続に ReadPreference を設定し ます。許可される値は、ReadPreference#valueOf () パブリック API でサポートされる値です。現在、MongoDB-Java-Driver バージョン 2.12.0 の時点で、サポートされる値は
primary 、primaryPreferred 、secondary 、および secondaryPreferred です。nearest このオプションの詳細については、の ドキュメント を参照してください。
|
(/) | |
dynamicity
|
false |
true に設定すると、エンドポイントは受信メッセージの CamelMongoDbDatabase ヘッダーおよび CamelMongoDbCollection ヘッダーを検査し、それらのいずれかが存在する場合は、その特定の操作に対してターゲットコレクションやデータベースが上書きされます。この機能が必要ない場合にすべてのエクスチェンジでルックアップがトリガーされないようにするには、デフォルトで false に設定します。
|
(/) | |
writeResultAsHeader
|
false |
Camel 2.10.3 および 2.11 で利用可能:書き込み操作(保存、更新、挿入など )で、ボディーを MongoDB によって返される WriteResult オブジェクトに置き換えるのではなく、入力ボディーを変更せず、CamelMongoWriteResult ヘッダー(constant MongoDbConstants.WRITERESULT )に WriteResult を配置します。
|
(/) | |
outputType
|
DBObjectList の場合: findAll
DBObject その他のすべての操作の場合
|
Camel 2.16: プロデューサーの出力を
DBObjectList 、DBObject または DBCursor の選択したタイプに変換します。
DBObjectList または DBCursor (出力をストリーミングするのに便利な場合があります)は、findAll に適用されます。DBObject その他のすべての操作に適用されます。
|
(/) | |
persistentTailTracking
|
false | Tailable Cursor コンシューマーの永続的なテールトラッキングを有効または無効にします。詳細は、以下を参照してください。 | (/) | |
persistentId
|
none | 永続的なテールトラッキングが有効な場合は必須です。この永続テールトラッカーの ID。これは、テールトラッキングコレクションでレコードを残りから分離します。 | (/) | |
tailTrackingIncreasingField
|
none | 永続的なテールトラッキングが有効な場合は必須です。増加する性質の着信レコードの相関フィールドであり、生成されるたびに tail カーソルを配置するために使用されます。カーソルは type: tailTrackIncreasingField > lastValue のクエリーで作成されます(lastValue は永続的なテールトラッキングから復元される可能性があります)。型は Integer、Date、String などにすることができます。注記:現在の時点でドット表記はサポートされていません。そのため、フィールドはドキュメントの最上位に置かれます。 | (/) | |
cursorRegenerationDelay
|
1000ms | MongoDB サーバーによって強制終了されてから、エンドポイントがカーソルの再生成を待機する時間を指定します(通常の動作)。 | (/) | |
tailTrackDb
|
エンドポイントの と同じです。 | 永続的なテールトラッカーがランタイム情報を保存するデータベース。 | (/) | |
tailTrackCollection
|
camelTailTracking | 永続テールトラッカーがランタイム情報を保存するコレクション。 | (/) | |
tailTrackField
|
lastTrackingValue | 永続的なテールトラッカーが最後に追跡された値を保存するフィールド。 | (/) |
Spring XML でのデータベースの設定
以下の Spring XML は、MongoDB インスタンスへのコネクションを定義する Bean を作成します。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="mongoBean" class="com.mongodb.Mongo"> <constructor-arg name="host" value="${mongodb.host}" /> <constructor-arg name="port" value="${mongodb.port}" /> </bean> </beans>
サンプルルート
Spring XML で定義された以下のルートは、コレクションで操作 dbStats を実行します。
<route> <from uri="direct:start" /> <!-- using bean 'mongoBean' defined above --> <to uri="mongodb:mongoBean?database=${mongodb.database}&collection=${mongodb.collection}&operation=getDbStats" /> <to uri="direct:result" /> </route>
MongoDB 操作 - プロデューサーエンドポイント
クエリー操作
findById
この操作は、_id フィールドが IN メッセージボディーの内容と一致するコレクションから 1 つの要素のみを取得します。受信オブジェクトは、BSON タイプと同等のものにすることができます。http://bsonspec.org/#/specification および http://www.mongodb.org/display/DOCS/Java+Types を参照してください。
from("direct:findById") .to("mongodb:myDb?database=flights&collection=tickets&operation=findById") .to("mock:resultFindById");
supports fields filter (フィールドフィルターに対応)
この操作は、フィールドフィルターの指定をサポートします。「フィールドフィルターの指定」を参照してください。
findOneByQuery
この操作を使用して、MongoDB クエリーに一致するコレクションから 1 つの要素のみを取得します。クエリーオブジェクトは IN メッセージボディー から抽出され ます。つまり、
DBObject
型であるか、DBObject
に変換できる必要があります。JSON 文字列または Hashmap にすることができます。詳細は 、タイプ変換 を参照してください。
クエリーのない例(コレクションの任意のオブジェクトを返します)。
from("direct:findOneByQuery") .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") .to("mock:resultFindOneByQuery");
クエリーの例(一致する結果 1 つを返す):
from("direct:findOneByQuery") .setBody().constant("{ \"name\": \"Raul Kripalani\" }") .to("mongodb:myDb?database=flights&collection=tickets&operation=findOneByQuery") .to("mock:resultFindOneByQuery");
supports fields filter (フィールドフィルターに対応)
この操作は、フィールドフィルターの指定をサポートします。「フィールドフィルターの指定」を参照してください。
findAll
findAll
操作は、クエリーに一致するすべてのドキュメント、またはまったく none を返します。この場合、コレクションに含まれるすべてのドキュメントが返されます。クエリーオブジェクトは IN メッセージボディー から抽出され ます。つまり、DBObject
型であるか、DBObject
に変換できる必要があります。JSON 文字列または Hashmap にすることができます。詳細は 、タイプ変換 を参照してください。
クエリーのない例(コレクション内のすべてのオブジェクトを返します)。
from("direct:findAll") .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") .to("mock:resultFindAll");
クエリーの例(一致する結果をすべて返す):
from("direct:findAll") .setBody().constant("{ \"name\": \"Raul Kripalani\" }") .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") .to("mock:resultFindAll");
ページングおよび効率的な取得は、以下のヘッダーでサポートされます。
ヘッダーのキー | クイック定数 | 説明 (MongoDB API ドキュメントから抜粋) | 想定されるタイプ |
---|---|---|---|
CamelMongoDbNumToSkip
|
MongoDbConstants.NUM_TO_SKIP
|
カーソルの先頭にある特定数の要素を破棄します。 | int/Integer |
CamelMongoDbLimit
|
MongoDbConstants.LIMIT
|
返された要素の数を制限します。 | int/Integer |
CamelMongoDbBatchSize
|
MongoDbConstants.BATCH_SIZE
|
1 つのバッチで返される要素の数を制限します。カーソルは通常、結果オブジェクトのバッチを取得してローカルに保存します。batchSize が正の場合、取得したオブジェクトの各バッチのサイズを表します。これを調整して、パフォーマンスを最適化し、データ転送を制限することができます。batchSize が負の値の場合、最大バッチサイズ制限(通常は 4MB)内に一致する、返されるオブジェクト数に制限し、カーソルは閉じられます。たとえば、batchSize が -10 の場合、サーバーは最大 10 ドキュメントを返し、最大で 4MB に該当するドキュメントを返し、カーソルを閉じます。この機能は、ドキュメントが最大サイズ内に収まる必要がある点で limit ()とは異なるため、カーソルサーバー側を閉じるために要求を送信する必要がなくなります。バッチサイズは、カーソルの反復後にも変更できます。この場合、設定は次のバッチ取得に適用されます。 | int/Integer |
さらに、
CamelMongoDbSortBy
ヘッダーでソートを記述する関連する DBObject
を配置することで sortBy 条件を設定することもできます。MongoDbConstants.SORT_BY
.
findAll
操作は以下の OUT ヘッダーも返し、ページングを使用している場合に結果ページを反復できるようにします。
ヘッダーのキー | クイック定数 | 説明 (MongoDB API ドキュメントから抜粋) | データのタイプ |
---|---|---|---|
CamelMongoDbResultTotalSize
|
MongoDbConstants.RESULT_TOTAL_SIZE
|
クエリーに一致するオブジェクトの数。これは、制限/スキップを考慮していません。 | int/Integer |
CamelMongoDbResultPageSize
|
MongoDbConstants.RESULT_PAGE_SIZE
|
クエリーに一致するオブジェクトの数。これは、制限/スキップを考慮していません。 | int/Integer |
supports fields filter (フィールドフィルターに対応)
この操作は、フィールドフィルターの指定をサポートします。「フィールドフィルターの指定」を参照してください。
count
Long
を Out メッセージボディーとして返すコレクション内のオブジェクトの合計数を返します。以下の例では、dynamicCollectionName
コレクションのレコード数をカウントします。動的性が有効になっている方法、その結果、操作は notableScientists
コレクションに対しては実行されず、dynamicCollectionName
コレクションに対しては実行されません。
// from("direct:count").to("mongodb:myDb?database=tickets&collection=flights&operation=count&dynamicity=true"); Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION, "dynamicCollectionName"); assertTrue("Result is not of type Long", result instanceof Long);
Camel 2.14 以降では、メッセージボディーに
com.mongodb.DBObject
オブジェクトをクエリーとして提供でき、操作はこの基準に一致するドキュメントの数を返します。
DBObject query = ... Long count = template.requestBodyAndHeader("direct:count", query, MongoDbConstants.COLLECTION, "dynamicCollectionName");
フィールドフィルターの指定
クエリー操作は、デフォルトでは、(フィールドすべてとともに)一致するオブジェクトを完全に返します。ドキュメントのサイズが大きく、フィールドのサブセットのみを取得する必要がある場合は、
CamelMongoDbFields
.FIELDS_FILTER
ヘッダーで関連する DBObject
(または JSON String や Map など)を DBObject に変換するだけで、すべてのクエリー操作でフィールドフィルターを指定できます。
以下は、MongoDB の BasicDBObjectBuilder を使用して DBObject の作成を簡素化する例です。
_id
および boringField
を除くすべてのフィールドを取得します。
// route: from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operation=findAll") DBObject fieldFilter = BasicDBObjectBuilder.start().add("_id", 0).add("boringField", 0).get(); Object result = template.requestBodyAndHeader("direct:findAll", (Object) null, MongoDbConstants.FIELDS_FILTER, fieldFilter);
作成/更新の操作
insert
IN メッセージボディーから取得した MongoDB コレクションに新しいオブジェクトを挿入します。型変換は、
DBObject
または List
に変換しようとします。単一の挿入と複数の挿入の 2 つのモードがサポートされます。複数の挿入の場合、エンドポイントは、- の場合、または - DBObject
に変換できる限り、任意のタイプのオブジェクトの List、Array、または Collections を想定します。すべてのオブジェクトは一度に挿入されます。エンドポイントは、入力に応じて呼び出すバックエンド操作(1 つまたは複数の挿入)をインテリジェントに決定します。
以下に例を示します。
from("direct:insert") .to("mongodb:myDb?database=flights&collection=tickets&operation=insert");
操作は WriteResult を返し、
WriteConcern
または invokeGetLastError
オプションの値によっては、getLastError ()
はすでに呼び出されます。書き込み操作の最終結果にアクセスする場合は、WriteResult
で getLastError ()または getCachedLastError
()
を呼び出して CommandResult
を取得する必要があります。次に、CommandResult. ok ()、CommandResult. getErrorMessage ()
、および/または CommandResult.getException
(
)を呼び出すことで結果を確認できます。
新規オブジェクトの
_id
はコレクションで一意である必要があることに注意してください。値を指定しないと、MongoDB が自動的に生成されます。しかし、それを指定して一意でない場合は、挿入操作は失敗します(Camel が通知する場合は、callGetLastError を有効にするか、書き込み結果を待つ WriteConcern を設定する必要があります)。
これはコンポーネントの制限ではありませんが、スループットを高めるために MongoDB でどのように機能するかです。カスタムの
_id
を使用している場合は、アプリケーションレベルが一意であることを確認する必要があります(これはグッドプラクティスでもあります)。
Camel 2.15 以降、挿入されたレコードの OID は、
CamelMongoOid
キー下のメッセージヘッダー(MongoDbConstants.OID
定数)に保存されます。1 つの挿入の場合は org.bson.types.ObjectId
を保存する値は java.util.List<org.bson.types.ObjectId>
で、複数のレコードが挿入されている場合は になります。
save
save 操作は upsert (UPdate、inSERT)操作と同等で、レコードが更新され、存在しない場合は、すべて 1 つのアトミック操作に挿入されます。MongoDB は _id フィールドに基づいてマッチングを実行します。
更新の場合、オブジェクトは完全に置き換えられ、MongoDB の $modifiers の 使用が許可されないことに注意してください。そのため、オブジェクトがすでに存在する場合は、2 つのオプションがあります。
- クエリーを実行して、オブジェクト全体をそのフィールドすべてと共に最初に取得します(効率的ではない場合もあります)、Camel 内で変更し、保存します。
- $modifiers とともに更新操作を使用してください。この操作は、代わりにサーバー側で更新を実行します。upsert フラグを有効にすることができます。この場合、挿入が必要な場合、MongoDB は $modifiers をフィルタークエリーオブジェクトに適用し、結果を挿入します。
以下に例を示します。
from("direct:insert") .to("mongodb:myDb?database=flights&collection=tickets&operation=save");
update
コレクションの 1 つまたは複数のレコードを更新します。正確に 2 つの要素が含まれる IN メッセージボディーとして List<DBObject> が必要です。
- 要素 1 (インデックス 0)=> フィルタークエリー => は、通常のクエリーオブジェクトと同様に、影響を受けるオブジェクトを決定します。
- element 2 (index 1)=> update rules => update rules => how matched objects will be updated.MongoDB からの 修飾子操作 はすべてサポートされます。
Multiupdates
デフォルトでは、複数のオブジェクトがフィルタークエリーと一致する場合でも、MongoDB は 1 オブジェクトのみを更新します。一致する すべて のレコードを更新するよう MongoDB に指示するには、
CamelMongoDbMultiUpdate
IN メッセージヘッダーを true
に設定します。
キーの
CamelMongoDbRecordsAffected
のあるヘッダーは、更新されたレコードの数( WriteResult.getN ()からコピー)を含むレコードの数とともに(MongoDbConstants.RECORDS_AFFECTED
定数)を返します。
以下の IN メッセージヘッダーをサポートします。
ヘッダーのキー | クイック定数 | 説明 (MongoDB API ドキュメントから抜粋) | 想定されるタイプ |
---|---|---|---|
CamelMongoDbMultiUpdate
|
MongoDbConstants.MULTIUPDATE
|
更新を一致するすべてのオブジェクトに適用する必要がある場合。http://www.mongodb.org/display/DOCS/Atomic+Operations を参照してください。 | boolean/Boolean |
CamelMongoDbUpsert
|
MongoDbConstants.UPSERT
|
データベースが存在しない場合には、要素を作成する必要がある | boolean/Boolean |
たとえば、以下は "scientist" フィールドの値を "Darwin" に設定することにより、filterField フィールドが true である すべて のレコードを更新します。
// route: from("direct:update").to("mongodb:myDb?database=science&collection=notableScientists&operation=update"); DBObject filterField = new BasicDBObject("filterField", true); DBObject updateObj = new BasicDBObject("$set", new BasicDBObject("scientist", "Darwin")); Object result = template.requestBodyAndHeader("direct:update", new Object[] {filterField, updateObj}, MongoDbConstants.MULTIUPDATE, true);
操作の削除
remove
コレクションから一致するレコードを削除します。IN メッセージ本文は削除フィルタークエリーとして機能し、タイプ
DBObject
または型として変換可能であることが予想されます。以下の例では、サイエンスデータベース、notableScientists コレクションで、フィールド 'conditionField' が true であるすべてのオブジェクトを削除します。
// route: from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientists&operation=remove"); DBObject conditionField = new BasicDBObject("conditionField", true); Object result = template.requestBody("direct:remove", conditionField);
キーの
CamelMongoDbRecordsAffected
のあるヘッダーが返され、タイプ int
のあるMongoDbConstants.RECORDS_AFFECTED
定数とともに返されます。これには、削除されたレコードの数( WriteResult.getN ()からコピー)が含まれます。
その他の操作
aggregate
Camel 2.14: ボディーに含まれる指定のパイプラインで集約を実行します。集約は長く重い操作になる可能性があります。注意して使用してください。
// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate"); from("direct:aggregate") .setBody().constant("[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]") .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate") .to("mock:resultAggregate");
getDbStats
MongoDB シェルで
db.stats ()
コマンドを実行するのと同等のもので、データベースに関する有用な統計情報が表示されます。以下に例を示します。
> db.stats(); { "db" : "test", "collections" : 7, "objects" : 719, "avgObjSize" : 59.73296244784423, "dataSize" : 42948, "storageSize" : 1000058880, "numExtents" : 9, "indexes" : 4, "indexSize" : 32704, "fileSize" : 1275068416, "nsSizeMB" : 16, "ok" : 1 }
使用例:
// from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getDbStats"); Object result = template.requestBody("direct:getDbStats", "irrelevantBody"); assertTrue("Result is not of type DBObject", result instanceof DBObject);
操作は、OUT メッセージボディーの
DBObject
の形式で、シェルに表示されるデータ構造と同様のデータ構造を返します。
getColStats
MongoDB シェルで
db.collection.stats ()
コマンドを実行するのと同等のもので、コレクションに関する有用な統計図を表示します。以下に例を示します。
> db.camelTest.stats(); { "ns" : "test.camelTest", "count" : 100, "size" : 5792, "avgObjSize" : 57.92, "storageSize" : 20480, "numExtents" : 2, "nindexes" : 1, "lastExtentSize" : 16384, "paddingFactor" : 1, "flags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1 }
使用例:
// from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&operation=getColStats"); Object result = template.requestBody("direct:getColStats", "irrelevantBody"); assertTrue("Result is not of type DBObject", result instanceof DBObject);
操作は、OUT メッセージボディーの
DBObject
の形式で、シェルに表示されるデータ構造と同様のデータ構造を返します。
command
Camel 2.15: ボディーをデータベースでコマンドとして実行します。ホスト情報、レプリケーション、またはシャーディングのステータスを取得する際に、管理操作に役立ちます。コレクションパラメーターは、この操作には使用されません。
// route: from("command").to("mongodb:myDb?database=science&operation=command"); DBObject commandBody = new BasicDBObject("hostInfo", "1"); Object result = template.requestBody("direct:command", commandBody);
動的操作
Exchange は、
MongoDbConstants.OPERATION_HEADER
定数によって定義される CamelMongoDbOperation
ヘッダーを設定することで、エンドポイントの固定操作を上書きできます。サポートされる値は MongoDbOperation の列挙によって決定され、エンドポイント URI の operation
パラメーターの許可される値と一致します。
以下に例を示します。
// from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operation=insert"); Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count"); assertTrue("Result is not of type Long", result instanceof Long);
Tailable Cursor コンシューマー
MongoDB は、*nix システムの
tail -f
コマンドと同様に、カーソルを開いた状態に維持することで、コレクションから継続中のデータを即座に消費するメカニズムを提供します。このメカニズムは、新しいデータを取得するためにクライアントがスケジュールされた間隔でクライアントに新しいデータをプッシュするので、スケジュールされたポーリングよりも効率的です。また、その他の冗長ネットワークトラフィックも軽減します。
テール可能なカーソルを使用する必要があるのは 1 つだけです。コレクションはキャプチャーコレクションである必要があります。つまり、このコレクションは N オブジェクトのみを保持し、制限に達すると MongoDB は、最初に挿入された順序で古いオブジェクトをフラッシュします。詳細は、http://www.mongodb.org/display/DOCS/Tailable+Cursors を参照してください。
Camel MongoDB コンポーネントはテール可能なカーソルコンシューマーを実装し、この機能を Camel ルートで使用できるようにします。新しいオブジェクトが挿入されると、MongoDB は DBObjects として、テール可能なカーソルコンシューマーに DBObject としてプッシュします。これにより、エクスチェンジに変換され、ルートロジックがトリガーされます。
テール可能なカーソルコンシューマーの仕組み
カーソルをテール可能なカーソルに移動するには、最初にカーソルを生成する際に、いくつかの特別なフラグが MongoDB に通知されます。作成が完了すると、カーソルは開いたままになり、新しいデータが到着するまで
DBCursor.next ()
メソッドを呼び出すとブロックされます。ただし、MongoDB サーバーは、新しいデータが不確定な期間の後に表示されない場合に、カーソルを終了する権利を留保します。新しいデータを引き続き使用する場合は、カーソルを再生成する必要があります。これには、停止した位置を覚えておく必要があります。そうでないと、最初からやり直し始めます。
Camel MongoDB テール可能なカーソルコンシューマーは、これらすべてのタスクを処理します。増加する性のあるデータ内の一部のフィールドにキーを指定する必要があります。これは、タイムスタンプ、順次 ID など、再生成されるたびにカーソルを配置するためのマーカーとして機能します。MongoDB でサポートされている任意のデータ型にすることができます。日付、文字列、および整数が適切に機能します。このメカニズムには、このコンポーネントのコンテキストで追跡が必要です。
コンシューマーはこのフィールドの最後の値を記憶し、カーソルを再生成するたびに、
increasingField > lastValue
のようなフィルターでクエリーを実行し、未読のデータのみが消費されるようにします。
増加フィールドの設定:エンドポイント URI
tailTrackingIncreasingField
オプションの increasing フィールドのキーを設定 します。Camel 2.10 では、このフィールドのネストされたナビゲーションはまだサポートされていないため、データの最上位フィールドである必要があります。つまり、timestamp フィールドは問題ありませんが、nested.timestamp は機能しません。ネストされた増加フィールドのサポートが必要な場合は、Camel JIRA でチケットを作成してください。
カーソルの再生成遅延: 初期化時に新しいデータがまだ利用できない場合、MongoDB はすぐにカーソルを強制終了します。この場合、サーバーに圧倒をかけないため、(デフォルト値 1000ms.)
cursorRegenerationDelay
オプションが導入されました。これは、ニーズに合わせて変更できます。
以下に例を示します。
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
上記のルートはflights.cancellations の上限付きコレクションから消費されます。これは、増加するフィールドとして departureTime を使用し、デフォルトの再生成カーソル遅延は 1000ms です。
永続的なテールトラッキング
標準のテールトラッキングは揮発性で、最後の値はメモリーにのみ保持されます。ただし、実際には、Camel コンテナーを毎回再起動する必要がありますが、最後の値は失われ、テール可能なカーソルコンシューマーは上部から消費を開始し、重複したレコードをルートに送信する可能性が非常に高くなります。
この状況に対処するために、永続テールトラッキング 機能を有効にして、MongoDB データベース内の特別なコレクションで最後に消費された値を追跡することもできます。コンシューマーが再び初期化されると、最後に追跡された値を復元し、何も起こらなかったかのように続行します。
最後の読み取り値は、2 つの状況で永続化されます。将来的にも一定間隔で永続化することを検討し(5 秒ごとにフラッシュ)、需要がある場合は堅牢性を高めることができます。この機能を要求するには、Camel JIRA でチケットを作成してください。
永続的なテールトラッキングの有効化
この関数を有効にするには、エンドポイント URI に少なくとも以下のオプションを設定します。
persistentTailTracking
オプションをtrue
に設定- このコンシューマー
の
一意の識別子への PersistentID オプション。これにより、多くのコンシューマーで同じコレクションを再利用できます。
さらに、
tailTrackDb
オプション、tailTrackCollection
オプション、および tailTrackField
オプションを設定して、ランタイム情報が保存される場所をカスタマイズすることができます。各オプションの説明は、このページの上部にあるエンドポイントオプションの表を参照してください。
たとえば、以下のルートはflights.cancellations の上限付きコレクションから消費され、増加するフィールドとして departureTime を使用します。デフォルトの再生成カーソル遅延は 1000ms で、永続的なテールトラッキングがオンになり、flights.camelTailTracking の id の下で永続化されます。 lastTrackingValue フィールドに最後に処理された値を保存する(
camelTailTracking
および lastTrackingValue
がデフォルトです)。
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
以下は上記と同じ例ですが、"lastProcessedDepartureTime" フィールドの "trackers.camelTrackers" コレクションの下に永続的なテールトラッキングランタイム情報が格納されます。
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker"&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");
型変換
camel-mongodb コンポーネントに含まれる
MongoDbBasicConverters
型コンバーターは、以下の変換を提供します。
名前 | タイプから | 以下を入力します。 | どのように変更を加えればよいですか ? |
---|---|---|---|
fromMapToDBObject |
マップ
|
DBObject
|
新しい BasicDBObject (Map m)コンストラクターを使用して新しい BasicDBObject を構築します。
|
fromBasicDBObjectToMap |
BasicDBObject
|
マップ
|
BasicDBObject はすでに マップ を実装しています
|
fromStringToDBObject |
文字列
|
DBObject
|
com.mongodb.util.JSON.parse(String s) を使用する
|
fromAnyObjectToDBObject |
オブジェクト
|
DBObject
|
Jackson ライブラリー を使用してオブジェクトを Map に変換します。これは、新しい BasicDBObject の初期化に使用されます。
|
この型コンバーターは自動検出されるため、手動で設定する必要はありません。