Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package gov.cdc.datacompareapis.configuration;

import gov.cdc.datacompareapis.service.interfaces.IStorageDataService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class StorageConfiguration {

private static final Logger logger = LoggerFactory.getLogger(StorageConfiguration.class);

@Value("${cloud.provider:AWS}")
private String cloudProvider;

@Bean
@Primary
public IStorageDataService storageDataService(
@Autowired(required = false) @Qualifier("awsS3") IStorageDataService awsS3Service,
@Autowired(required = false) @Qualifier("azureBlob") IStorageDataService azureBlobService) {

logger.info("=== STORAGE PROVIDER CONFIGURATION (APIs) ===");
logger.info("Configured cloud provider: {}", cloudProvider);

if ("AZURE".equalsIgnoreCase(cloudProvider)) {
if (azureBlobService == null) {
logger.error("Azure Blob service is not available but AZURE provider is configured!");
throw new IllegalStateException("Azure Blob service is not available. Check your configuration.");
}
logger.info("Selected Azure Blob Storage service as primary storage provider");
logger.info("Azure Blob service class: {}", azureBlobService.getClass().getSimpleName());
logger.info("=== END STORAGE PROVIDER CONFIGURATION (APIs) ===");
return azureBlobService;
}

// Default to AWS
if (awsS3Service == null) {
logger.error("AWS S3 service is not available!");
throw new IllegalStateException("AWS S3 service is not available. Check your configuration.");
}
logger.info("Selected AWS S3 service as primary storage provider (default)");
logger.info("AWS S3 service class: {}", awsS3Service.getClass().getSimpleName());
logger.info("=== END STORAGE PROVIDER CONFIGURATION (APIs) ===");

return awsS3Service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

Expand All @@ -24,6 +25,7 @@
import static gov.cdc.datacompareapis.constant.ConstantValue.LOG_SUCCESS;

@Service("azureBlob")
@ConditionalOnProperty(name = "cloud.provider", havingValue = "AZURE", matchIfMissing = false)
public class AzureBlobDataService implements IStorageDataService {
private static final Logger logger = LoggerFactory.getLogger(AzureBlobDataService.class);

Expand Down Expand Up @@ -87,6 +89,8 @@ public AzureBlobDataService(BlobServiceClient blobServiceClient) {
* DOMAIN/TABLE/TIMESTAMP/TABLE_INDEX
*/
public String persistMultiPart(String domain, String records, String fileName, Timestamp persistingTimestamp, int index) {
logger.debug("AzureBlobDataService: Persisting data to Azure Blob - domain: {}, fileName: {}, index: {}, data size: {} bytes",
domain, fileName, index, records.length());
String log = LOG_SUCCESS;
try {
if (records.equalsIgnoreCase("[]") || records.isEmpty()) {
Expand All @@ -107,9 +111,9 @@ public String persistMultiPart(String domain, String records, String fileName, T
BinaryData binaryData = BinaryData.fromString(records);
blobClient.upload(binaryData, true);

logger.info("Successfully uploaded data to Azure Blob Storage: {}", blobName);
logger.debug("AzureBlobDataService: Successfully persisted data to Azure Blob Storage: {}", blobName);
} catch (Exception e) {
logger.error("Error persisting data to Azure Blob Storage: {}", e.getMessage(), e);
logger.error("AzureBlobDataService: Error persisting data to Azure Blob Storage: {}", e.getMessage(), e);
log = e.getMessage();
}
return log;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DataPullerService implements IDataPullerService {
private final JdbcTemplate rdbJdbcTemplate;
private final JdbcTemplate rdbModernJdbcTemplate;
private final JdbcTemplate odseJdbcTemplate;
private final IStorageDataService s3DataService;
private final IStorageDataService storageDataService;
private final KafkaPropertiesProvider kafkaPropertiesProvider;
private final Executor chunkTaskExecutor;
private long batchId;
Expand All @@ -62,19 +62,24 @@ public DataPullerService(DataCompareConfigRepository dataCompareConfigRepository
@Qualifier("rdbModernJdbcTemplate") JdbcTemplate rdbModernJdbcTemplate,
KafkaPropertiesProvider kafkaPropertiesProvider,
@Qualifier("odseJdbcTemplate") JdbcTemplate odseJdbcTemplate,
@Qualifier("awsS3") IStorageDataService s3DataService ,
DataCompareBatchRepository dataCompareBatchRepository,
DataCompareBatchRepository dataCompareBatchRepository, IStorageDataService storageDataService,
@Qualifier("chunkTaskExecutor") Executor chunkTaskExecutor) {
this.dataCompareConfigRepository = dataCompareConfigRepository;
this.dataCompareLogRepository = dataCompareLogRepository;
this.kafkaProducerService = kafkaProducerService;
this.rdbJdbcTemplate = rdbJdbcTemplate;
this.rdbModernJdbcTemplate = rdbModernJdbcTemplate;
this.odseJdbcTemplate = odseJdbcTemplate;
this.s3DataService = s3DataService;
this.dataCompareBatchRepository = dataCompareBatchRepository;
this.kafkaPropertiesProvider = kafkaPropertiesProvider;
this.storageDataService = storageDataService;
this.chunkTaskExecutor = chunkTaskExecutor;

// Log which storage service is being used
logger.info("=== DATA PULLER SERVICE INITIALIZATION ===");
logger.info("Injected storage service: {}", storageDataService.getClass().getSimpleName());
logger.info("Storage service implementation: {}", storageDataService.getClass().getName());
logger.info("=== END DATA PULLER SERVICE INITIALIZATION ===");
this.gson = new GsonBuilder()
.registerTypeAdapter(Timestamp.class, TimestampAdapter.getTimestampSerializer())
.registerTypeAdapter(Timestamp.class, TimestampAdapter.getTimestampDeserializer())
Expand Down Expand Up @@ -380,7 +385,7 @@ private CompletableFuture<Void> processRdbPagesParallel(int totalPages, int pull
new SimpleDateFormat("yyyyMMddHHmmss").format(currentTime),
config.getTableName(), pageIndex);

String uploadResult = s3DataService.persistMultiPart(dbType, rawJsonData,
String uploadResult = storageDataService.persistMultiPart(dbType, rawJsonData,
config.getTableName(), currentTime, pageIndex);

if (!uploadResult.equals(LOG_SUCCESS)) {
Expand Down Expand Up @@ -469,7 +474,7 @@ private CompletableFuture<Void> processOdsePagesParallel(int totalPages, int pul
new SimpleDateFormat("yyyyMMddHHmmss").format(currentTime),
tableName, pageIndex);

String uploadResult = s3DataService.persistMultiPart(dbType, rawJsonData,
String uploadResult = storageDataService.persistMultiPart(dbType, rawJsonData,
tableName, currentTime, pageIndex);

if (!uploadResult.equals(LOG_SUCCESS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
Expand All @@ -26,6 +27,7 @@
import static gov.cdc.datacompareapis.constant.ConstantValue.LOG_SUCCESS;

@Service("awsS3")
@ConditionalOnProperty(name = "cloud.provider", havingValue = "AWS", matchIfMissing = true)
public class S3DataService implements IStorageDataService {
private static Logger logger = LoggerFactory.getLogger(S3DataService.class);

Expand Down Expand Up @@ -80,6 +82,8 @@ public S3DataService(S3Client s3Client) {
* DOMAIN/TABLE/TIMESTAMP/TABLE_INDEX
* */
public String persistMultiPart(String domain, String records, String fileName, Timestamp persistingTimestamp, int index) {
logger.debug("S3DataService: Persisting data to S3 - domain: {}, fileName: {}, index: {}, data size: {} bytes",
domain, fileName, index, records.length());
String log = LOG_SUCCESS;
try {
if (records.equalsIgnoreCase("[]") || records.isEmpty()) {
Expand Down Expand Up @@ -123,10 +127,11 @@ public String persistMultiPart(String domain, String records, String fileName, T
}

completeMultipartUpload(uploadId, s3Key, completedParts);
logger.debug("S3DataService: Successfully persisted data to S3 - s3Key: {}", s3Key);
}
catch (Exception e)
{
logger.info(e.getMessage());
logger.error("S3DataService: Error persisting data to S3: {}", e.getMessage());
log = e.getMessage();
}
return log;
Expand Down
3 changes: 2 additions & 1 deletion DataCompareAPIs/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ spring:
auth:
token-uri: ${spring.security.oauth2.resourceserver.jwt.issuer-uri}/protocol/openid-connect/token
introspect-uri: ${spring.security.oauth2.resourceserver.jwt.issuer-uri}/protocol/openid-connect/token/introspect

cloud:
provider: ${CLOUD_PROVIDER:AWS}
aws:
auth:
static:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package gov.cdc.datacompareprocessor.configuration;

import gov.cdc.datacompareprocessor.service.interfaces.IStorageDataPullerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class StorageConfiguration {

private static final Logger logger = LoggerFactory.getLogger(StorageConfiguration.class);

@Value("${cloud.provider:AWS}")
private String cloudProvider;

@Bean
@Primary
public IStorageDataPullerService storageDataPullerService(
@Autowired(required = false) @Qualifier("awsS3") IStorageDataPullerService awsS3Service,
@Autowired(required = false) @Qualifier("azureBlob") IStorageDataPullerService azureBlobService) {

logger.info("=== STORAGE PROVIDER CONFIGURATION ===");
logger.info("Configured cloud provider: {}", cloudProvider);

if ("AZURE".equalsIgnoreCase(cloudProvider)) {
if (azureBlobService == null) {
logger.error("Azure Blob service is not available but AZURE provider is configured!");
throw new IllegalStateException("Azure Blob service is not available. Check your configuration.");
}
logger.info("Selected Azure Blob Storage service as primary storage provider");
logger.info("Azure Blob service class: {}", azureBlobService.getClass().getSimpleName());
logger.info("=== END STORAGE PROVIDER CONFIGURATION ===");
return azureBlobService;
}

// Default to AWS
if (awsS3Service == null) {
logger.error("AWS S3 service is not available!");
throw new IllegalStateException("AWS S3 service is not available. Check your configuration.");
}
logger.info("Selected AWS S3 service as primary storage provider (default)");
logger.info("AWS S3 service class: {}", awsS3Service.getClass().getSimpleName());
logger.info("=== END STORAGE PROVIDER CONFIGURATION ===");

return awsS3Service;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.sql.Timestamp;

@Service("azureBlob")
@ConditionalOnProperty(name = "cloud.provider", havingValue = "AZURE", matchIfMissing = false)
public class AzureBlobDataPullerService implements IStorageDataPullerService {
private static final Logger logger = LoggerFactory.getLogger(AzureBlobDataPullerService.class);

Expand Down Expand Up @@ -99,6 +101,7 @@ public AzureBlobDataPullerService(BlobServiceClient blobServiceClient) {


public JsonElement readJsonFromStorage(String fileName) {
logger.debug("AzureBlobDataPullerService: Reading JSON from Azure Blob - fileName: {}", fileName);
try {
// Get container client
BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(containerName);
Expand All @@ -111,6 +114,7 @@ public JsonElement readJsonFromStorage(String fileName) {
String jsonData = binaryData.toString();

// Parse JSON string to JsonElement
logger.debug("AzureBlobDataPullerService: Successfully read {} bytes from Azure Blob", jsonData.length());
return JsonParser.parseString(jsonData);
} catch (Exception e) {
logger.error("Azure Blob Read Error: {}, {}", fileName, e.getMessage());
Expand All @@ -120,6 +124,7 @@ public JsonElement readJsonFromStorage(String fileName) {

public String uploadDataToStorage(String folder1, String folder2, String folder3, String folder4, String fileName, String data) {
String blobName = String.format("%s/%s/%s/%s/%s", folder1, folder2, folder3, folder4, fileName);
logger.debug("AzureBlobDataPullerService: Uploading data to Azure Blob - blobName: {}, data size: {} bytes", blobName, data.length());

try {
// Get container client
Expand All @@ -132,7 +137,7 @@ public String uploadDataToStorage(String folder1, String folder2, String folder3
BinaryData binaryData = BinaryData.fromString(data);
blobClient.upload(binaryData, true);

logger.info("Successfully uploaded data to Azure Blob Storage: {}", blobName);
logger.debug("AzureBlobDataPullerService: Successfully uploaded data to Azure Blob Storage: {}", blobName);
} catch (Exception e) {
logger.error("Azure Blob Write Error: {}, {}", blobName, e.getMessage());
}
Expand Down
Loading