Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-21105] Collecting Audit Log Meta Data independently at ACH's readChannel's finally and AuditLogSetterHook. And Create AuditLogRequest in write/close call. #15790

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
import io.cdap.cdap.security.spi.authentication.SecurityRequestContext;
import io.cdap.cdap.security.spi.authorization.AuditLogRequest;
import io.cdap.http.AbstractHandlerHook;
import io.cdap.http.HttpResponder;
import io.cdap.http.internal.HandlerInfo;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Sets
* Sets audit log metadata to {@link SecurityRequestContext}.
*/
public class AuditLogSetterHook extends AbstractHandlerHook {

Expand All @@ -52,7 +48,7 @@ public AuditLogSetterHook(CConfiguration cConf, String serviceName) {

@Override
public void postCall(HttpRequest request, HttpResponseStatus status, HandlerInfo handlerInfo) {
if (!Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider)){
if (!Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider)) {
return;
}

Expand All @@ -68,19 +64,17 @@ public void postCall(HttpRequest request, HttpResponseStatus status, HandlerInfo
long startTime = endTime - (endTimeNanos - startTimeNanos);

LOG.trace("Setting a Audit Log event to published in SecurityRequestContext");
SecurityRequestContext.setAuditLogRequest(
new AuditLogRequest(
status.code(),
SecurityRequestContext.getUserIp(),
request.uri(),
getSimpleName(handlerInfo.getHandlerName()),
handlerInfo.getMethodName(),
request.method().name(),
SecurityRequestContext.getAuditLogQueue(),
startTime,
endTime
)
);

AuditLogRequest.Builder auditLogRequestBuilder = new AuditLogRequest.Builder()
.operationResponseCode(status.code())
.uri(request.uri())
.handler(getSimpleName(handlerInfo.getHandlerName()))
.method(handlerInfo.getMethodName())
.methodType(request.method().name())
.startTimeNanos(startTime)
.endTimeNanos(endTime);

SecurityRequestContext.setAuditLogRequestBuilder(auditLogRequestBuilder);
}

private String getSimpleName(String className) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Queue;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An UpstreamHandler that verifies the userId in a request header and updates the {@code
Expand All @@ -53,7 +54,9 @@ public class AuthenticationChannelHandler extends ChannelDuplexHandler {
"CDAP-empty-user-credential",
Credential.CredentialType.INTERNAL);
private static final String EMPTY_USER_IP = "CDAP-empty-user-ip";
private static final String AUDIT_LOG_REQ_ATTR_NAME = "AUDIT_LOG_REQUEST";
static final String AUDIT_LOG_REQ_BUILDER_ATTR = "AUDIT_LOG_REQ_BUILDER";
static final String AUDIT_LOG_USER_IP_ATTR = "AUDIT_LOG_USER_IP";
static final String AUDIT_LOG_CONTEXT_QUEUE_ATTR = "AUDIT_LOG_CONTEXT_QUEUE";

private final boolean internalAuthEnabled;
private final boolean auditLoggingEnabled;
Expand Down Expand Up @@ -130,22 +133,21 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
SecurityRequestContext.setUserId(currentUserId);
SecurityRequestContext.setUserCredential(currentUserCredential);
SecurityRequestContext.setUserIp(currentUserIp);
//Also set userIp in ATTR , to be used in audit logging incase it was replaced at a later stage
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_USER_IP_ATTR)).set(currentUserIp);
}

try {
ctx.fireChannelRead(msg);
} finally {
// Set the audit log info onto the ongoing channel so it is ensured to be reused later in the same channel, making
// it independent of Thread local.
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQ_ATTR_NAME))
.set(SecurityRequestContext.getAuditLogRequest());
setAuditLogMetaDataInChannel(ctx);
SecurityRequestContext.reset();
}
}

/**
* If Audit logging is enabled then it sends the collection of audit events stored in {@link SecurityRequestContext}
* to get stored in a messaging system.
* Or Attribute of channel to get stored in a messaging system.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Expand Down Expand Up @@ -178,10 +180,86 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
* It's not null, publish it and set it to Null.
*/
private void publishAuditLogRequest(ChannelHandlerContext ctx) throws IOException {
Object auditLogRequestObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQ_ATTR_NAME)).get();
if (auditLoggingEnabled && auditLogRequestObj != null) {
auditLogWriter.publish((AuditLogRequest) auditLogRequestObj);
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQ_ATTR_NAME)).set(null);
if (!auditLoggingEnabled) {
return;
}
AuditLogRequest auditLogRequest = getAuditLogRequest(ctx);
if (auditLogRequest != null) {
auditLogWriter.publish(auditLogRequest);
}
}


@Nullable
private AuditLogRequest getAuditLogRequest(ChannelHandlerContext ctx) {

Object auditLogContextsQueueAttr =
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_CONTEXT_QUEUE_ATTR)).get();

// If NO audit logs, then return NULL.
if (auditLogContextsQueueAttr == null) {
return null;
}

Queue<AuditLogContext> auditLogContextsQueue = (Queue<AuditLogContext>) auditLogContextsQueueAttr;

Object userIpObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_USER_IP_ATTR)).get();
String userIp = userIpObj == null ? SecurityRequestContext.getUserIp() : (String) userIpObj;

// Check Attr for AuditLogRequest Builder.
Object builderObj = ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQ_BUILDER_ATTR)).get();
AuditLogRequest.Builder builder = builderObj == null ? SecurityRequestContext.getAuditLogRequestBuilder() :
(AuditLogRequest.Builder) builderObj;

//If the AuditLogContextsQueue is NOT empty, then the AuditLogRequest.Builder should be NEVER be null.
//It should either come from ATTR of pipeline or SecurityRequestContext.
//Ideally we will never encounter this.
if (builder == null) {
LOG.error("The meta data required to publish audit logs are missing or null. Following Audit logs will be "
+ "skipped: {}", auditLogContextsQueue.stream().map(String::valueOf)
.collect(Collectors.joining(", ")));
//Returning null as Operation is already completed.
return null;
}

//Clear Attributes
clearAttributes(ctx);

return builder
.userIp(userIp)
.auditLogContextQueue(auditLogContextsQueue)
.build();
}

/**
* Stores metadata from SecurityRequestContext inside channelRead's Finally method.
*/
private void setAuditLogMetaDataInChannel(ChannelHandlerContext ctx) {

if (!auditLoggingEnabled) {
return;
}

Queue<AuditLogContext> auditLogContextQueue = SecurityRequestContext.getAuditLogQueue();

// In either case, if Queue from SecurityRequestContext is empty, then no Audit Logs.
if (!auditLogContextQueue.isEmpty()) {
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_CONTEXT_QUEUE_ATTR))
.set(auditLogContextQueue);
}

// Store all audit metadata info stored in AuditLogRequest.Builder in ATTR from AuditLogSetterHook#postCall
// This is just to ensure we don't lose metadata information if already populated because of some RESET call on
// SecurityRequestContext. This Also ensures that if Thread changes in Close / Write , then this info is preserved.
AuditLogRequest.Builder builder = SecurityRequestContext.getAuditLogRequestBuilder();
if (builder != null) {
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQ_BUILDER_ATTR)).set(builder);
}
}

private void clearAttributes(ChannelHandlerContext ctx) {
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_REQ_BUILDER_ATTR)).set(null);
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_USER_IP_ATTR)).set(null);
ctx.channel().attr(AttributeKey.valueOf(AUDIT_LOG_CONTEXT_QUEUE_ATTR)).set(null);
}
}
Loading
Loading