|
| 1 | +Data streamers are defined by `IgniteDataStreamer` API and are built to inject large amounts of continuous streams of data into Ignite caches. Data streamers are built in a scalable and fault-tolerant fashion and provide **at-least-once-guarantee** semantics for all the data streamed into Ignite. |
| 2 | +[block:api-header] |
| 3 | +{ |
| 4 | + "type": "basic", |
| 5 | + "title": "IgniteDataStreamer" |
| 6 | +} |
| 7 | +[/block] |
| 8 | +The main abstraction for fast streaming of large amounts of data into Ignite is `IgniteDataStreamer`, which internally will properly batch keys together and collocate those batches with nodes on which the data will be cached. |
| 9 | + |
| 10 | +The high loading speed is achieved with the following techniques: |
| 11 | + * Entries that are mapped to the same cluster member will be batched together in a buffer. |
| 12 | + * Multiple buffers can coexist at the same time. |
| 13 | + * To avoid running out of memory, data streamer has a maximum number of buffers it can process concurrently. |
| 14 | + |
| 15 | +To add data to the data streamer, you should call `IgniteDataStreamer.addData(...)` method. |
| 16 | +[block:code] |
| 17 | +{ |
| 18 | + "codes": [ |
| 19 | + { |
| 20 | + "code": "// Get the data streamer reference and stream data.\ntry (IgniteDatastreamder<Integer, String> stmr = ignite.dataStreamer(\"myStreamCache\")) { \n // Stream entries.\n for (int i = 0; i < 100000; i++)\n stmr.addData(i, Integer.toString(i));\n}", |
| 21 | + "language": "java" |
| 22 | + } |
| 23 | + ] |
| 24 | +} |
| 25 | +[/block] |
| 26 | +## Allow Overwrite |
| 27 | +By default, the data streamer will not overwrite existing data, which means that if it will encounter an entry that is already in cache, it will skip it. This is the most efficient and performant mode, as the data streamer does not have to worry about data versioning in the background. |
| 28 | + |
| 29 | +If you anticipate that the data may already be in the streaming cache and you need to overwrite it, you should set `IgniteDataStreamer.allowOverwrite(true)` parameter. |
| 30 | +[block:api-header] |
| 31 | +{ |
| 32 | + "type": "basic", |
| 33 | + "title": "StreamReceiver" |
| 34 | +} |
| 35 | +[/block] |
| 36 | +For cases when you need to execute some custom logic instead of just adding new data, you can take advantage of `StreamReceiver` API. |
| 37 | + |
| 38 | +Stream receivers allow you to react to the streamed data in collocated fashion, directly on the nodes where it will be cached. You can change the data or add any custom pre-processing logic to it, before putting the data into cache. |
| 39 | +[block:callout] |
| 40 | +{ |
| 41 | + "type": "info", |
| 42 | + "body": "Note that `StreamReceiver` does not put data into cache automatically. You need to call any of the `cache.put(...)` methods explicitly." |
| 43 | +} |
| 44 | +[/block] |
| 45 | + |
| 46 | +[block:api-header] |
| 47 | +{ |
| 48 | + "type": "basic", |
| 49 | + "title": "StreamTransformer" |
| 50 | +} |
| 51 | +[/block] |
| 52 | +`StreamTransformer` is a convenience implementation of `StreamReceiver` which updates data in the stream cache based on its previous value. The update is collocated, i.e. it happens exactly on the cluster node where the data is stored. |
| 53 | + |
| 54 | +In the example below, we use `StreamTransformer` to increment a counter for each distinct word found in the text stream. |
| 55 | +[block:code] |
| 56 | +{ |
| 57 | + "codes": [ |
| 58 | + { |
| 59 | + "code": "CacheConfiguration cfg = new CacheConfiguration(\"wordCountCache\");\n\nIgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg);\n\ntry (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {\n // Allow data updates.\n stmr.allowOverwrite(true);\n\n // Configure data transformation to count instances of the same word.\n stmr.receiver(StreamTransformer.from((e, arg) -> {\n // Get current count.\n Long val = e.getValue();\n\n // Increment count by 1.\n e.setValue(val == null ? 1L : val + 1);\n\n return null;\n }));\n\n // Stream words into the streamer cache.\n for (String word : text)\n stmr.addData(word, 1L);\n}", |
| 60 | + "language": "java", |
| 61 | + "name": "transformer" |
| 62 | + }, |
| 63 | + { |
| 64 | + "code": "CacheConfiguration cfg = new CacheConfiguration(\"wordCountCache\");\n\nIgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg);\n\ntry (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {\n // Allow data updates.\n stmr.allowOverwrite(true);\n\n // Configure data transformation to count instances of the same word.\n stmr.receiver(new StreamTransformer<String, Long>() {\n @Override public Object process(MutableEntry<String, Long> e, Object... args) {\n // Get current count.\n Long val = e.getValue();\n\n // Increment count by 1.\n e.setValue(val == null ? 1L : val + 1);\n\n return null;\n }\n });\n\n // Stream words into the streamer cache.\n for (String word : text)\n stmr.addData(word, 1L);", |
| 65 | + "language": "java", |
| 66 | + "name": "java7 transformer" |
| 67 | + } |
| 68 | + ] |
| 69 | +} |
| 70 | +[/block] |
| 71 | + |
| 72 | +[block:api-header] |
| 73 | +{ |
| 74 | + "type": "basic", |
| 75 | + "title": "StreamVisitor" |
| 76 | +} |
| 77 | +[/block] |
| 78 | +`StreamVisitor` is also a convenience implementation of `StreamReceiver` which visits every key-value tuple in the stream. Note, that the visitor does not update the cache. If the tuple needs to be stored in the cache, then any of the `cache.put(...)` methods should be called explicitly. |
| 79 | + |
| 80 | +In the example below, we have 2 caches: "marketData", and "instruments". We receive market data ticks and put them into the streamer for the "marketData" cache. The `StreamVisitor` for the "marketData" streamer is invoked on the cluster member mapped to the particular market symbol. Upon receiving individual market ticks it updates the "instrument" cache with latest market price. |
| 81 | + |
| 82 | +Note, that we do not update "marketData" cache at all, leaving it empty. We simply use for collocated processing of the market data within the cluster directly on the node where the data will be stored. |
| 83 | +[block:code] |
| 84 | +{ |
| 85 | + "codes": [ |
| 86 | + { |
| 87 | + "code": "CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>(\"marketData\");\nCacheConfiguration<String, Double> instCfg = new CacheConfiguration<>(\"instruments\");\n\n// Cache for market data ticks streamed into the system.\nIgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);\n\n// Cache for financial instruments.\nIgniteCache<String, Double> insts = ignite.getOrCreateCache(instCfg);\n\ntry (IgniteDataStreamer<String, Integer> mktStmr = ignite.dataStreamer(\"marketData\")) {\n // Note that we do not populate 'marketData' cache (it remains empty).\n // Instead we update the 'instruments' cache based on the latest market price.\n mktStmr.receiver(StreamVisitor.from((cache, e) -> {\n String symbol = e.getKey();\n Double tick = e.getValue();\n\n Instrument inst = instCache.get(symbol);\n\n if (inst == null)\n inst = new Instrument(symbol);\n\n // Update instrument price based on the latest market tick.\n inst.setHigh(Math.max(inst.getLatest(), tick);\n inst.setLow(Math.min(inst.getLatest(), tick);\n inst.setLatest(tick);\n\n // Update instrument cache.\n instCache.put(symbol, inst);\n }));\n\n // Stream market data into Ignite.\n for (Map.Entry<String, Double> tick : marketData)\n mktStmr.addData(tick);\n}", |
| 88 | + "language": "java" |
| 89 | + } |
| 90 | + ] |
| 91 | +} |
| 92 | +[/block] |
0 commit comments