1
- ## Copyright (c) 2021, 2022 Oracle and/or its affiliates.
1
+ ## Copyright (c) 2021, 2024 Oracle and/or its affiliates.
2
2
## The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl/
3
3
4
4
require 'fluent/plugin/output'
7
7
require 'yajl'
8
8
require 'yajl/json_gem'
9
9
10
+ # require 'tzinfo'
10
11
require 'logger'
11
12
require_relative '../dto/logEventsJson'
12
13
require_relative '../dto/logEvents'
13
14
require_relative '../metrics/prometheusMetrics'
14
15
require_relative '../metrics/metricsLabels'
16
+ require_relative '../enums/source'
15
17
16
18
# Import only specific OCI modules to improve load times and reduce the memory requirements.
17
19
require 'oci/auth/auth'
36
38
require 'oci/waiter'
37
39
require 'oci/retry/retry'
38
40
require 'oci/object_storage/object_storage'
39
-
40
41
module OCI
41
42
class << self
42
43
attr_accessor :sdk_name
@@ -97,7 +98,8 @@ class OutOracleOCILogAnalytics < Output
97
98
config_param :zip_file_location , :string , :default => nil
98
99
desc 'The kubernetes_metadata_keys_mapping.'
99
100
config_param :kubernetes_metadata_keys_mapping , :hash , :default => { "container_name" :"Container" , "namespace_name" :"Namespace" , "pod_name" :"Pod" , "container_image" :"Container Image Name" , "host" :"Node" }
100
-
101
+ desc 'opc-meta-properties'
102
+ config_param :collection_source , :string , :default => Source ::FLUENTD
101
103
102
104
#****************************************************************
103
105
desc 'The http proxy to be used.'
@@ -256,6 +258,14 @@ def initialize_loganalytics_client()
256
258
else
257
259
@@loganalytics_client = OCI ::LogAnalytics ::LogAnalyticsClient . new ( config : OCI ::Config . new , signer : instance_principals_signer )
258
260
end
261
+ when "WorkloadIdentity"
262
+ workload_identity_signer = OCI ::Auth ::Signers ::oke_workload_resource_principal_signer
263
+ if is_valid ( @endpoint )
264
+ @@loganalytics_client = OCI ::LogAnalytics ::LogAnalyticsClient . new ( config : OCI ::Config . new , endpoint : @endpoint , signer : workload_identity_signer )
265
+ @@logger . info { "loganalytics_client initialised with endpoint: #{ @endpoint } " }
266
+ else
267
+ @@loganalytics_client = OCI ::LogAnalytics ::LogAnalyticsClient . new ( config : OCI ::Config . new , signer : workload_identity_signer )
268
+ end
259
269
when "ConfigFile"
260
270
my_config = OCI ::ConfigFileLoader . load_config ( config_file_location : @config_file_location , profile_name : @profile_name )
261
271
if is_valid ( @endpoint )
@@ -628,6 +638,8 @@ def group_by_logGroupId(chunk)
628
638
latency = 0
629
639
records_per_tag = 0
630
640
641
+
642
+
631
643
tag_metrics_set = Hash . new
632
644
logGroup_labels_set = Hash . new
633
645
@@ -637,8 +649,8 @@ def group_by_logGroupId(chunk)
637
649
tags_per_logGroupId = Hash . new
638
650
tag_logSet_map = Hash . new
639
651
tag_metadata_map = Hash . new
652
+ timezoneValuesByTag = Hash . new
640
653
incoming_records = 0
641
-
642
654
chunk . each do |time , record |
643
655
incoming_records += 1
644
656
metricsLabels = MetricsLabels . new
@@ -722,6 +734,8 @@ def group_by_logGroupId(chunk)
722
734
end
723
735
next
724
736
end
737
+
738
+ # metricsLabels.timezone = record["oci_la_timezone"]
725
739
metricsLabels . logGroupId = record [ "oci_la_log_group_id" ]
726
740
metricsLabels . logSourceName = record [ "oci_la_log_source_name" ]
727
741
if record [ "oci_la_log_set" ] != nil
@@ -770,6 +784,25 @@ def group_by_logGroupId(chunk)
770
784
tags_per_logGroupId [ record [ "oci_la_log_group_id" ] ] = record [ "tag" ]
771
785
end
772
786
end
787
+ # validating the timezone field
788
+ if !timezoneValuesByTag . has_key? ( record [ "tag" ] )
789
+ begin
790
+ timezoneIdentifier = record [ "oci_la_timezone" ]
791
+ unless is_valid ( timezoneIdentifier )
792
+ record [ "oci_la_timezone" ] = nil
793
+ else
794
+ isTimezoneExist = timezone_exist? timezoneIdentifier
795
+ unless isTimezoneExist
796
+ @@logger . warn { "Invalid timezone '#{ timezoneIdentifier } ', using default UTC." }
797
+ record [ "oci_la_timezone" ] = "UTC"
798
+ end
799
+
800
+ end
801
+ timezoneValuesByTag [ record [ "tag" ] ] = record [ "oci_la_timezone" ]
802
+ end
803
+ else
804
+ record [ "oci_la_timezone" ] = timezoneValuesByTag [ record [ "tag" ] ]
805
+ end
773
806
774
807
records << record
775
808
ensure
@@ -916,6 +949,14 @@ def write(chunk)
916
949
end
917
950
end
918
951
end
952
+ def timezone_exist? ( tz )
953
+ begin
954
+ TZInfo ::Timezone . get ( tz )
955
+ return true
956
+ rescue TZInfo ::InvalidTimezoneIdentifier
957
+ return false
958
+ end
959
+ end
919
960
920
961
# Each oci_la_log_set will correspond to a separate file in the zip
921
962
# Only MAX_FILES_PER_ZIP files are allowed per zip.
@@ -958,6 +999,21 @@ def get_logSets_map_per_logGroupId(oci_la_log_group_id,records_per_logGroupId)
958
999
959
1000
# takes a fluentD chunk and converts it to an in-memory zipfile, populating metrics hash provided
960
1001
# Any exception raised is passed into the metrics hash, to be re-thrown from write()
1002
+ def getCollectionSource ( input )
1003
+ collections_src = [ ]
1004
+ if !is_valid input
1005
+ collections_src . unshift ( "source:#{ Source ::FLUENTD } " )
1006
+ else
1007
+ if input == Source ::FLUENTD . to_s or input == Source ::KUBERNETES_SOLUTION . to_s
1008
+ collections_src . unshift ( "source:#{ input } " )
1009
+ else
1010
+ # source not define ! using default source 'fluentd'
1011
+ collections_src . unshift ( "source:#{ Source ::FLUENTD } " )
1012
+ end
1013
+ end
1014
+ collections_src
1015
+ end
1016
+
961
1017
def get_zipped_stream ( oci_la_log_group_id , oci_la_global_metadata , records_per_logSet_map )
962
1018
begin
963
1019
current , = Time . now
@@ -970,8 +1026,9 @@ def get_zipped_stream(oci_la_log_group_id,oci_la_global_metadata,records_per_log
970
1026
record [ 'oci_la_metadata' ] ,
971
1027
record [ 'oci_la_entity_id' ] ,
972
1028
record [ 'oci_la_entity_type' ] ,
973
- record [ 'oci_la_log_source_name' ] ,
974
- record [ 'oci_la_log_path' ]
1029
+ record [ 'oci_la_log_source_name' ] ,
1030
+ record [ 'oci_la_log_path' ] ,
1031
+ record [ 'oci_la_timezone' ]
975
1032
] } . map { |lrpe_key , records_per_lrpe |
976
1033
number_of_records += records_per_lrpe . length
977
1034
LogEvents . new ( lrpe_key , records_per_lrpe )
@@ -1021,9 +1078,10 @@ def save_zip_to_local(oci_la_log_group_id, zippedstream, current_s)
1021
1078
# upload zipped stream to oci
1022
1079
def upload_to_oci ( oci_la_log_group_id , number_of_records , zippedstream , metricsLabels_array )
1023
1080
begin
1081
+ collection_src_prop = getCollectionSource @collection_source
1024
1082
error_reason = nil
1025
1083
error_code = nil
1026
- opts = { payload_type : "ZIP" }
1084
+ opts = { payload_type : "ZIP" , opc_meta_properties : collection_src_prop }
1027
1085
1028
1086
response = @@loganalytics_client . upload_log_events_file ( namespace_name = @namespace ,
1029
1087
logGroupId = oci_la_log_group_id ,
0 commit comments