You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"2-1": "Controls maximum number of local ports tried if all previously tried ports are occupied.",
@@ -59,29 +58,27 @@ Following configuration parameters can be optionally configured on `TcpCommuncat
59
58
"7-2": "512 or `IGNITE_MIN_BUFFERED_COMMUNICATION_MSG_CNT` system property value, if set.",
60
59
"8-1": "Sets flag indicating whether dual-socket connection between nodes should be enforced. If set to true, two separate connections will be established between communicating nodes: one for outgoing messages, and one for incoming. When set to false, single TCP connection will be used for both directions.\nThis flag is useful on some operating systems, when TCP_NODELAY flag is disabled and messages take too long to get delivered.",
61
60
"8-2": "false",
62
-
"9-1": "Sets port resolver for internal-to-external port mapping. In some cases network routers are configured to perform port mapping between external and internal networks and the same mapping must be available to SPIs in GridGain that perform communication over IP protocols.",
63
-
"9-2": "null",
64
-
"10-1": "This parameter is used only when `setAsyncSend(boolean)` is set to false. \n\nSets connection buffer size for synchronous connections. Increase buffer size if using synchronous send and sending large amount of small sized messages. However, most of the time this should be set to 0 (default).",
65
-
"10-2": "0",
66
-
"11-1": "Sets the count of selectors to be used in TCP server.",
67
-
"11-2": "Default count of selectors equals to the expression result - \nMath.min(4, Runtime.getRuntime() .availableProcessors())",
68
-
"12-1": "This parameter is used only when `setAsyncSend(boolean)` is set to false. \n\nSets connection buffer flush frequency in milliseconds. This parameter makes sense only for synchronous send when connection buffer size is not 0. Buffer will be flushed once within specified period if there is no enough messages to make it flush automatically.",
69
-
"12-2": "100",
70
-
"13-1": "Switches between using NIO direct and NIO heap allocation buffers. Although direct buffers perform better, in some cases (especially on Windows) they may cause JVM crashes. If that happens in your environment, set this property to false.",
71
-
"13-2": "true",
72
-
"14-1": "Switches between using NIO direct and NIO heap allocation buffers usage for message sending in asynchronous mode.",
73
-
"14-2": "false",
74
-
"15-1": "Switches between synchronous and asynchronous message sending.\nThis should be set to true (default) if grid nodes send large amount of data over network from multiple threads, however this maybe environment and application specific and we recommend to benchmark the application in both modes.",
75
-
"15-2": "true",
76
-
"16-1": "Sets port which will be used by `IpcSharedMemoryServerEndpoint`. \nNodes started on the same host will communicate over IPC shared memory (only for Linux and MacOS hosts). Set this to -1 to disable IPC shared memory communication.",
77
-
"16-2": "48100",
78
-
"17-1": "Sets receive buffer size for sockets created or accepted by this SPI. If not provided, default is 0 which leaves buffer unchanged after socket creation (i.e. uses Operating System default value).",
79
-
"17-2": "0",
80
-
"18-1": "Sets send buffer size for sockets created or accepted by this SPI. If not provided, default is 0 which leaves the buffer unchanged after socket creation (i.e. uses Operating System default value).",
81
-
"18-2": "0"
61
+
"9-1": "This parameter is used only when `setAsyncSend(boolean)` is set to false. \n\nSets connection buffer size for synchronous connections. Increase buffer size if using synchronous send and sending large amount of small sized messages. However, most of the time this should be set to 0 (default).",
62
+
"9-2": "0",
63
+
"10-1": "Sets the count of selectors to be used in TCP server.",
64
+
"10-2": "Default count of selectors equals to the expression result - \nMath.min(4, Runtime.getRuntime() .availableProcessors())",
65
+
"11-1": "This parameter is used only when `setAsyncSend(boolean)` is set to false. \n\nSets connection buffer flush frequency in milliseconds. This parameter makes sense only for synchronous send when connection buffer size is not 0. Buffer will be flushed once within specified period if there is no enough messages to make it flush automatically.",
66
+
"11-2": "100",
67
+
"12-1": "Switches between using NIO direct and NIO heap allocation buffers. Although direct buffers perform better, in some cases (especially on Windows) they may cause JVM crashes. If that happens in your environment, set this property to false.",
68
+
"12-2": "true",
69
+
"13-1": "Switches between using NIO direct and NIO heap allocation buffers usage for message sending in asynchronous mode.",
70
+
"13-2": "false",
71
+
"14-1": "Switches between synchronous and asynchronous message sending.\nThis should be set to true (default) if grid nodes send large amount of data over network from multiple threads, however this maybe environment and application specific and we recommend to benchmark the application in both modes.",
72
+
"14-2": "true",
73
+
"15-1": "Sets port which will be used by `IpcSharedMemoryServerEndpoint`. \nNodes started on the same host will communicate over IPC shared memory (only for Linux and MacOS hosts). Set this to -1 to disable IPC shared memory communication.",
74
+
"15-2": "48100",
75
+
"16-1": "Sets receive buffer size for sockets created or accepted by this SPI. If not provided, default is 0 which leaves buffer unchanged after socket creation (i.e. uses Operating System default value).",
76
+
"16-2": "0",
77
+
"17-1": "Sets send buffer size for sockets created or accepted by this SPI. If not provided, default is 0 which leaves the buffer unchanged after socket creation (i.e. uses Operating System default value).",
@@ -50,6 +50,6 @@ Sliding windows are configured as Ignite cache eviction policies, and can be tim
50
50
[read more](doc:sliding-windows)
51
51
52
52
##Querying Data
53
-
You can use Ignite full set of Ignite data indexing capabilities, together with Ignite SQL, TEXT, and Predicate based cache queries, to query into the streaming data.
53
+
You can use full set of Ignite data indexing capabilities, together with Ignite SQL, TEXT, and Predicate based cache queries, to query into the streaming data.
In this example we will stream text into Ignite and count each individual word. We will also issue periodic SQL queries into the stream to query top 10 most popular words.
2
+
3
+
The example will work as follows:
4
+
1. We will setup up a cache to hold the words and their counts.
5
+
2. We will setup a 5 second *sliding window* to keep the word counts only for last 5 seconds.
6
+
3.`StreamWords` program will stream text data into Ignite.
7
+
4.`QueryWords` program will query top 10 words out of the stream.
8
+
[block:api-header]
9
+
{
10
+
"type": "basic",
11
+
"title": "Cache Configuration"
12
+
}
13
+
[/block]
14
+
We define a `CacheConfig` class which will provide configuration to be used from both programs, `StreamWords` and `QueryWords`. The cache will use words as keys, and counts for words as values.
15
+
16
+
Note that in this example we use a sliding window of 5 seconds for our cache. This means that words will disappear from cache after 5 seconds since they were first entered into cache.
17
+
[block:code]
18
+
{
19
+
"codes": [
20
+
{
21
+
"code": "public class CacheConfig {\n public static CacheConfiguration<String, Long> wordCache() {\n CacheConfiguration<String, Long> cfg = new CacheConfiguration<>(\"words\");\n\n // Index the words and their counts,\n // so we can use them for fast SQL querying.\n cfg.setIndexedTypes(String.class, Long.class);\n\n // Sliding window of 5 seconds.\n cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(\n new CreatedExpiryPolicy(new Duration(SECONDS, 5))));\n\n return cfg;\n }\n}",
22
+
"language": "java"
23
+
}
24
+
]
25
+
}
26
+
[/block]
27
+
28
+
[block:api-header]
29
+
{
30
+
"type": "basic",
31
+
"title": "Stream Words"
32
+
}
33
+
[/block]
34
+
We define a `StreamWords` class which will be responsible to continuously read words form a local text file ("alice-in-wonderland.txt" in our case) and stream them into Ignite "words" cache.
35
+
36
+
## Streamer Configuration
37
+
1. We set `allowOverwrite` flag to `true` to make sure that existing counts can be updated.
38
+
2. We configure a `StreamTransformer` which takes currently cache count for a word and increments it by 1.
39
+
[block:code]
40
+
{
41
+
"codes": [
42
+
{
43
+
"code": "public class StreamWords {\n public static void main(String[] args) throws Exception {\n // Mark this cluster member as client.\n Ignition.setClientMode(true);\n\n try (Ignite ignite = Ignition.start()) {\n IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());\n\n // Create a streamer to stream words into the cache.\n try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {\n // Allow data updates.\n stmr.allowOverwrite(true);\n\n // Configure data transformation to count instances of the same word.\n stmr.receiver(StreamTransformer.from((e, arg) -> {\n // Get current count.\n Long val = e.getValue();\n\n // Increment current count by 1.\n e.setValue(val == null ? 1L : val + 1);\n\n return null;\n }));\n\n // Stream words from \"alice-in-wonderland\" book.\n while (true) {\n Path path = Paths.get(StreamWords.class.getResource(\"alice-in-wonderland.txt\").toURI());\n\n // Read words from a text file.\n try (Stream<String> lines = Files.lines(path)) {\n lines.forEach(line -> {\n Stream<String> words = Stream.of(line.split(\" \"));\n\n // Stream words into Ignite streamer.\n words.forEach(word -> {\n if (!word.trim().isEmpty())\n stmr.addData(word, 1L);\n });\n });\n }\n }\n }\n }\n }\n}",
44
+
"language": "java"
45
+
}
46
+
]
47
+
}
48
+
[/block]
49
+
50
+
[block:api-header]
51
+
{
52
+
"type": "basic",
53
+
"title": "Query Words"
54
+
}
55
+
[/block]
56
+
We define a `QueryWords` class which will periodically query word counts form the cache.
57
+
58
+
## SQL Query
59
+
1. We use standard SQL to query the counts.
60
+
2. Ignite SQL treats Java classes as SQL tables. Since our counts are stored as simple `Long` type, the SQL query below queries `Long` table.
61
+
3. Ignite always stores cache keys and values as `_key` and `_val` fields, so we use this syntax in our SQL query.
62
+
[block:code]
63
+
{
64
+
"codes": [
65
+
{
66
+
"code": "public class QueryWords {\n public static void main(String[] args) throws Exception {\n // Mark this cluster member as client.\n Ignition.setClientMode(true);\n\n try (Ignite ignite = Ignition.start()) {\n IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());\n\n // Select top 10 words.\n SqlFieldsQuery top10Qry = new SqlFieldsQuery(\n \"select _key, _val from Long order by _val desc limit 10\");\n\n // Query top 10 popular words every 5 seconds.\n while (true) {\n // Execute queries.\n List<List<?>> top10 = stmCache.query(top10Qry).getAll();\n\n // Print top 10 words.\n ExamplesUtils.printQueryResults(top10);\n\n Thread.sleep(5000);\n }\n }\n }\n}",
67
+
"language": "java"
68
+
}
69
+
]
70
+
}
71
+
[/block]
72
+
73
+
[block:api-header]
74
+
{
75
+
"type": "basic",
76
+
"title": "Starting Server Nodes"
77
+
}
78
+
[/block]
79
+
In order to run the example, you need to start data nodes. In Ignite, data nodes are called `server` nodes. You can start as many server nodes as you like, but you should have at least 1 in order to run the example.
80
+
81
+
Server nodes can be started from command line as follows:
82
+
[block:code]
83
+
{
84
+
"codes": [
85
+
{
86
+
"code": "bin/ignite.sh",
87
+
"language": "shell"
88
+
}
89
+
]
90
+
}
91
+
[/block]
92
+
You can also start server nodes programmatically, like so:
93
+
[block:code]
94
+
{
95
+
"codes": [
96
+
{
97
+
"code": "public class ExampleNodeStartup {\n public static void main(String[] args) throws IgniteException {\n Ignition.start();\n }\n}",
0 commit comments