Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 68 additions & 147 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ responsible for the authentication of the logs and forwarding to UTMStack platfo
+ [Remove collector](#remove-collector)
+ [Configuration stream](#configuration-stream)
+ [List collectors](#list-collectors)
+ [List collector's hostnames](#list-collectors-hostnames)
+ [Collector by hostname and module](#collector-by-hostname-and-module)
+ [Send logs to log-auth-proxy](#send-logs-to-log-auth-proxy)
+ [Upsert collector configuration](#upsert-collector-configuration)
- [Important classes](#important-classes)
Expand Down Expand Up @@ -180,7 +178,7 @@ This method is used to remove all information of a collector, including its conf
~~~
import com.utmstack.grpc.service.CollectorService;
import com.utmstack.grpc.exception.CollectorServiceGrpcException;
import agent.CollectorOuterClass.CollectorDelete;
import agent.Common.DeleteRequest;
import agent.Common.AuthResponse;
import com.utmstack.grpc.connection.GrpcConnection;
import com.utmstack.grpc.exception.GrpcConnectionException;
Expand All @@ -200,7 +198,7 @@ String collectorKey = "the collector key";
int collectorId = 1; // the collector's database id
String deletedBy = "a user name, or IP, or hostname"; // Something that indicates who performed the action

CollectorDelete req = CollectorDelete.newBuilder()
DeleteRequest req = DeleteRequest.newBuilder()
.setDeletedBy(deletedBy)
.build();
AuthResponse collector = AuthResponse.newBuilder().setKey(collectorKey)
Expand All @@ -225,7 +223,7 @@ con.getConnectionChannel().shutdown();
[Back to Contents](#contents)<br>
This method is a bidirectional stream used to receive collector's configurations asynchronously from the server and send confirmation back when received.
This method is more complex than the others because needs that you implement you own action when receiving
a configuration, to do that you must create a class that implements the `IExecuteActionOnNext` interface.
a configuration, to do that you must create a class that implements the `IExecuteActionOnNext` and `IExecuteActionOnError` interfaces.
This method try to recover itself after server reconnections, so, you don't have to worry about connect
to server do all over again, but we strongly **_recommend to execute the code below in a separated thread_**
to avoid unwanted interruptions.
Expand All @@ -241,11 +239,16 @@ import com.utmstack.grpc.connection.GrpcConnection;
import com.utmstack.grpc.exception.GrpcConnectionException;
import com.utmstack.grpc.jclient.config.interceptors.impl.GrpcEmptyAuthInterceptor;
import com.utmstack.grpc.service.iface.IExecuteActionOnNext;
import com.utmstack.grpc.service.iface.IExecuteActionOnError;

import com.utmstack.grpc.util.StringUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import io.grpc.stub.StreamObserver;
~~~
<br>**Usage**<br>
Create a class that holds the action to execute when a configuration is received, see an example below.
Create a class that holds the action to execute when a configuration is received and other when stream has errors, see an example below.
~~~
public class OnNextConfiguration implements IExecuteActionOnNext {

Expand All @@ -267,6 +270,23 @@ public class OnNextConfiguration implements IExecuteActionOnNext {
}
~~~
~~~
public class OnErrorCollectorMessagesStream implements IExecuteActionOnError {
private static final Logger logger = LogManager.getLogger(OnErrorCollectorMessagesStream.class);

@Override
public void executeOnError(String message) {
// Log the error message, check if it is server UNAVAILABLE, if true -> try to request configuration on the next execution
if (message.contains("UNAVAILABLE")) {
logger.error("Connecting to the collector configuration stream: " + StringUtil.formatError(message));
} else if (message.contains("ALREADY_EXISTS")) {
logger.info("Collector configuration stream is connected, reconnection is not needed.");
} else {
logger.error("Connecting to the collector configuration stream, server responded with unusual error: " + StringUtil.formatError(message));
}
}
}
~~~
~~~
try {
GrpcConnection con = new GrpcConnection();
con.createChannel(AGENT_MANAGER_HOST, AGENT_MANAGER_PORT, new GrpcEmptyAuthInterceptor());
Expand All @@ -277,6 +297,8 @@ try {
// Authentication information
String collectorKey = "the collector key";
int collectorId = 1; // the collector's database id
AuthResponse collector = AuthResponse.newBuilder().setKey(collectorKey)
.setId(collectorId).build();

// Creating the stream
StreamObserver<CollectorMessages> collectorStreamObserver;
Expand All @@ -286,7 +308,7 @@ try {
while (true) {
try {
// Connecting to the stream
collectorStreamObserver = s.getCollectorStreamObserver(new OnNextConfiguration(),collector);
collectorStreamObserver = s.getCollectorStreamObserver(new OnNextConfiguration(), new OnErrorCollectorMessagesStream(),collector);

// Wait for server response
finishLatch.await(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -363,106 +385,16 @@ ListCollectorResponse response = serv.listCollector(req, internalKey);
con.getConnectionChannel().shutdown();
~~~

#### List collector's hostnames
[Back to Contents](#contents)<br>
This method is similar than the list collector but only return the hostnames of collectors, the request can be filtered and sorted as needed.
<br>**Imports**
~~~
import com.utmstack.grpc.service.CollectorService;
import com.utmstack.grpc.exception.CollectorServiceGrpcException;
import agent.CollectorOuterClass.CollectorModule;
import agent.CollectorOuterClass.CollectorHostnames;
import agent.Common.ListRequest;
import com.utmstack.grpc.connection.GrpcConnection;
import com.utmstack.grpc.exception.GrpcConnectionException;
import com.utmstack.grpc.jclient.config.interceptors.impl.GrpcEmptyAuthInterceptor;
~~~
<br>**Usage**<br>
~~~
try {
GrpcConnection con = new GrpcConnection();
con.createChannel(AGENT_MANAGER_HOST, AGENT_MANAGER_PORT, new GrpcEmptyAuthInterceptor());

// Instantiating the collector service with a connection to the agent manager
CollectorService serv = new CollectorService(con);

// Authentication information
String internalKey = "the UTMStack's internal key";

ListRequest req = ListRequest.newBuilder()
.setPageNumber(0)
.setPageSize(1000)
.setSearchQuery("module.Is=" + CollectorModule.AS_400.name())
.setSortBy("")
.build()

// List collector's hostnames according to the filters
// CollectorHostnames is an array of Strings containing the hostnames
CollectorHostnames response = serv.ListCollectorHostnames(req, internalKey);

} catch (GrpcConnectionException e) {
// Your exception handling here when the channel can't be created
} catch (CollectorServiceGrpcException e) {
// Your exception handling here when the collector's hostnames can't be listed
}
~~~
**Note:** When you use non-streaming methods like before, ensure that you close the channel with:
~~~
// Close the connection channel
con.getConnectionChannel().shutdown();
~~~


#### Collector by hostname and module
[Back to Contents](#contents)<br>
This method is used to get a collector with its configuration by hostname and module.
<br>**Imports**
~~~
import com.utmstack.grpc.service.CollectorService;
import com.utmstack.grpc.exception.CollectorServiceGrpcException;
import agent.CollectorOuterClass.CollectorModule;
import agent.CollectorOuterClass.CollectorHostnames;
import agent.CollectorOuterClass.FilterByHostAndModule
import agent.Common.ListRequest;
import com.utmstack.grpc.connection.GrpcConnection;
import com.utmstack.grpc.exception.GrpcConnectionException;
import com.utmstack.grpc.jclient.config.interceptors.impl.GrpcEmptyAuthInterceptor;
~~~
<br>**Usage**<br>
~~~
try {
GrpcConnection con = new GrpcConnection();
con.createChannel(AGENT_MANAGER_HOST, AGENT_MANAGER_PORT, new GrpcEmptyAuthInterceptor());

// Instantiating the collector service with a connection to the agent manager
CollectorService serv = new CollectorService(con);

// Authentication information
String internalKey = "the UTMStack's internal key";

FilterByHostAndModule req = FilterByHostAndModule.newBuilder()
.setHostname("Collector's hostname")
.setModule(CollectorModule.AS_400).build();

// Receiving the collector information, including configurations
ListCollectorResponse response = serv.GetCollectorsByHostnameAndModule(req, internalKey);

} catch (GrpcConnectionException e) {
// Your exception handling here when the channel can't be created
} catch (CollectorServiceGrpcException e) {
// Your exception handling here when the collector's information can't be listed
}
~~~
**Note:** When you use non-streaming methods like before, ensure that you close the channel with:
~~~
// Close the connection channel
con.getConnectionChannel().shutdown();
~~~


#### Send logs to log-auth-proxy
[Back to Contents](#contents)<br>
This method is used to send logs from a collector to [log-auth-proxy](#description).
This method is used to send logs from a collector to [log-auth-proxy](#description). This method is a bidirectional stream used to
send logs to server and receive the acknowledgment when received by server.
This method is more complex than the others because needs that you implement you own action for the acknowledgment, to do
that you must create a class that implements the `OnNextLogsAck` interface.
We strongly **_recommend to execute the code below in a separated thread_**
to avoid unwanted interruptions.

<br>**Imports**
~~~
import agent.Common;
Expand All @@ -489,7 +421,10 @@ LogMessagingService serv = new LogMessagingService(con);


// Authentication information
String collectorKey = "the collector's key";
String collectorKey = "the collector key";
int collectorId = 1; // the collector's database id
AuthResponse collector = AuthResponse.newBuilder().setKey(collectorKey)
.setId(collectorId).build();

// Creating the stream used to send logs to server
StreamObserver<Plugins.Log> logStreamObserver = serv.getLogsStreamObserver(new OnNextLogsAck(),collector);
Expand All @@ -503,7 +438,6 @@ String collectorKey = "the collector's key";
.setId(UUID.randomUUID().toString())
.setDataSource("hostname, IP or some datasource identifier")
.setDataType(DATA_TYPE)
.setTimestamp(ConfigVerification.getActualTime())
.setRaw(message)
.build();
logStreamObserver.onNext(log);
Expand Down Expand Up @@ -706,11 +640,9 @@ package agent;

service CollectorService {
rpc RegisterCollector(RegisterRequest) returns (AuthResponse) {}
rpc DeleteCollector(CollectorDelete) returns (AuthResponse) {}
rpc DeleteCollector(DeleteRequest) returns (AuthResponse) {}
rpc ListCollector (ListRequest) returns (ListCollectorResponse) {}
rpc CollectorStream(stream CollectorMessages) returns (stream CollectorMessages) {}
rpc ListCollectorHostnames (ListRequest) returns (CollectorHostnames) {}
rpc GetCollectorsByHostnameAndModule (FilterByHostAndModule) returns (ListCollectorResponse) {}
rpc GetCollectorConfig (ConfigRequest) returns (CollectorConfig) {}
}

Expand All @@ -722,36 +654,16 @@ enum CollectorModule{
AS_400 = 0;
}

message CollectorMessages {
oneof stream_message {
CollectorConfig config = 1;
ConfigKnowledge result = 2;
}
}

message CollectorHostnames{
repeated string hostname = 1;
}

message FilterByHostAndModule{
string hostname = 1;
CollectorModule module = 2;
}

message ConfigKnowledge{
string accepted = 1;
string request_id = 2;
}

message RegisterRequest {
string ip = 1;
string hostname = 2;
string version = 3;
CollectorModule collector = 4;
}

message ConfigRequest {
CollectorModule module = 1;
message ListCollectorResponse {
repeated Collector rows = 1;
int32 total = 2;
}

message Collector {
Expand All @@ -762,25 +674,32 @@ message Collector {
string hostname = 5;
string version = 6;
CollectorModule module = 7;
repeated CollectorConfigGroup groups = 8;
string last_seen = 9;
string last_seen = 8;
}

message CollectorMessages {
oneof stream_message {
CollectorConfig config = 1;
ConfigKnowledge result = 2;
}
}

message CollectorConfig {
string collector_key = 1;
string collector_id = 1;
repeated CollectorConfigGroup groups = 2;
string request_id = 3;
}

message CollectorConfigGroup {
int32 id = 1;
string group_name = 3;
string group_description = 4;
repeated CollectorGroupConfigurations configurations = 5;
int32 collector_id = 6;
string group_name = 2;
string group_description = 3;
repeated CollectorGroupConfigurations configurations = 4;
int32 collector_id = 5;
}

message CollectorGroupConfigurations {
int32 id = 1;
int32 group_id = 2;
string conf_key = 3;
string conf_value = 4;
Expand All @@ -790,14 +709,15 @@ message CollectorGroupConfigurations {
bool conf_required = 8;
}

message CollectorDelete {
string deleted_by = 1;
message ConfigKnowledge{
string accepted = 1;
string request_id = 2;
}

message ListCollectorResponse {
repeated Collector rows = 1;
int32 total = 2;
message ConfigRequest {
CollectorModule module = 1;
}

~~~

#### Logs
Expand All @@ -821,7 +741,7 @@ message Log {
string id = 1;
string dataType = 2;
string dataSource = 3;
string timestamp = 4;
string timestamp = 4 [json_name="@timestamp"];
string tenantId = 5;
string raw = 6;
}
Expand Down Expand Up @@ -890,7 +810,8 @@ message AuthResponse {
string key = 2;
}

message Hostname{
string hostname = 1;
message DeleteRequest {
string deleted_by = 1;
}

~~~
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.utmstack.grpc.jclient</groupId>
<artifactId>collector-client-4j</artifactId>
<version>1.2.10</version>
<version>1.2.13</version>
<packaging>jar</packaging>
<name>UTMStack Agent Collector Client for Java projects</name>

Expand Down
Loading