Skip to content

supporting blog content: onelake connector part II #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions supporting-blog-content/onelake-connector-part-ii/connectors/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#

import os

from envyaml import EnvYAML

from connectors.logger import logger

DEFAULT_ELASTICSEARCH_MAX_RETRIES = 5
DEFAULT_ELASTICSEARCH_RETRY_INTERVAL = 10

DEFAULT_MAX_FILE_SIZE = 10485760 # 10MB


def load_config(config_file):
logger.info(f"Loading config from {config_file}")
yaml_config = EnvYAML(config_file, flatten=False).export()
nested_yaml_config = {}
for key, value in yaml_config.items():
_nest_configs(nested_yaml_config, key, value)
configuration = dict(_merge_dicts(_default_config(), nested_yaml_config))
_ent_search_config(configuration)

return configuration


def add_defaults(config, default_config=None):
if default_config is None:
default_config = _default_config()
configuration = dict(_merge_dicts(default_config, config))
return configuration


# Left - in Enterprise Search; Right - in Connectors
config_mappings = {
"elasticsearch.host": "elasticsearch.host",
"elasticsearch.username": "elasticsearch.username",
"elasticsearch.password": "elasticsearch.password",
"elasticsearch.headers": "elasticsearch.headers",
"log_level": "service.log_level",
}

# Enterprise Search uses Ruby and is in lower case always, so hacking it here for now
# Ruby-supported log levels: 'debug', 'info', 'warn', 'error', 'fatal', 'unknown'
# Python-supported log levels: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', 'NOTSET'
log_level_mappings = {
"debug": "DEBUG",
"info": "INFO",
"warn": "WARNING",
"error": "ERROR",
"fatal": "CRITICAL",
"unknown": "NOTSET",
}


def _default_config():
return {
"elasticsearch": {
"host": "http://localhost:9200",
"username": "elastic",
"password": "changeme",
"ssl": True,
"verify_certs": True,
"bulk": {
"queue_max_size": 1024,
"queue_max_mem_size": 25,
"queue_refresh_interval": 1,
"queue_refresh_timeout": 600,
"display_every": 100,
"chunk_size": 1000,
"max_concurrency": 5,
"chunk_max_mem_size": 5,
"max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES,
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
"concurrent_downloads": 10,
"enable_operations_logging": False,
},
"max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES,
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
"retry_on_timeout": True,
"request_timeout": 120,
"max_wait_duration": 120,
"initial_backoff_duration": 1,
"backoff_multiplier": 2,
"log_level": "info",
"feature_use_connectors_api": True,
},
"service": {
"idling": 30,
"heartbeat": 300,
"preflight_max_attempts": 10,
"preflight_idle": 30,
"max_errors": 20,
"max_errors_span": 600,
"max_concurrent_content_syncs": 1,
"max_concurrent_access_control_syncs": 1,
"max_file_download_size": DEFAULT_MAX_FILE_SIZE,
"job_cleanup_interval": 300,
"log_level": "INFO",
},
"sources": {
"onelake": "connectors.sources.onelake:OneLakeDataSource",
"azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource",
"box": "connectors.sources.box:BoxDataSource",
"confluence": "connectors.sources.confluence:ConfluenceDataSource",
"dir": "connectors.sources.directory:DirectoryDataSource",
"dropbox": "connectors.sources.dropbox:DropboxDataSource",
"github": "connectors.sources.github:GitHubDataSource",
"gmail": "connectors.sources.gmail:GMailDataSource",
"google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource",
"google_drive": "connectors.sources.google_drive:GoogleDriveDataSource",
"graphql": "connectors.sources.graphql:GraphQLDataSource",
"jira": "connectors.sources.jira:JiraDataSource",
"microsoft_teams": "connectors.sources.microsoft_teams:MicrosoftTeamsDataSource",
"mongodb": "connectors.sources.mongo:MongoDataSource",
"mssql": "connectors.sources.mssql:MSSQLDataSource",
"mysql": "connectors.sources.mysql:MySqlDataSource",
"network_drive": "connectors.sources.network_drive:NASDataSource",
"notion": "connectors.sources.notion:NotionDataSource",
"onedrive": "connectors.sources.onedrive:OneDriveDataSource",
"oracle": "connectors.sources.oracle:OracleDataSource",
"outlook": "connectors.sources.outlook:OutlookDataSource",
"postgresql": "connectors.sources.postgresql:PostgreSQLDataSource",
"redis": "connectors.sources.redis:RedisDataSource",
"s3": "connectors.sources.s3:S3DataSource",
"salesforce": "connectors.sources.salesforce:SalesforceDataSource",
"servicenow": "connectors.sources.servicenow:ServiceNowDataSource",
"sharepoint_online": "connectors.sources.sharepoint_online:SharepointOnlineDataSource",
"sharepoint_server": "connectors.sources.sharepoint_server:SharepointServerDataSource",
"slack": "connectors.sources.slack:SlackDataSource",
"zoom": "connectors.sources.zoom:ZoomDataSource",
},
}


def _ent_search_config(configuration):
if "ENT_SEARCH_CONFIG_PATH" not in os.environ:
return
logger.info("Found ENT_SEARCH_CONFIG_PATH, loading ent-search config")
ent_search_config = EnvYAML(os.environ["ENT_SEARCH_CONFIG_PATH"])
for es_field in config_mappings.keys():
if es_field not in ent_search_config:
continue

connector_field = config_mappings[es_field]
es_field_value = ent_search_config[es_field]

if es_field == "log_level":
if es_field_value not in log_level_mappings:
msg = f"Unexpected log level: {es_field_value}. Allowed values: {', '.join(log_level_mappings.keys())}"
raise ValueError(msg)
es_field_value = log_level_mappings[es_field_value]

_nest_configs(configuration, connector_field, es_field_value)

logger.debug(f"Overridden {connector_field}")


def _nest_configs(configuration, field, value):
"""
Update configuration field value taking into account the nesting.

Configuration is a hash of hashes, so we need to dive inside to do proper assignment.

E.g. _nest_config({}, "elasticsearch.bulk.queuesize", 20) will result in the following config:
{
"elasticsearch": {
"bulk": {
"queuesize": 20
}
}
}
"""
subfields = field.split(".")
last_key = subfields[-1]

current_leaf = configuration
for subfield in subfields[:-1]:
if subfield not in current_leaf:
current_leaf[subfield] = {}
current_leaf = current_leaf[subfield]

if isinstance(current_leaf.get(last_key), dict):
current_leaf[last_key] = dict(_merge_dicts(current_leaf[last_key], value))
else:
current_leaf[last_key] = value


def _merge_dicts(hsh1, hsh2):
for k in set(hsh1.keys()).union(hsh2.keys()):
if k in hsh1 and k in hsh2:
if isinstance(hsh1[k], dict) and isinstance(
hsh2[k], dict
): # only merge objects
yield (k, dict(_merge_dicts(hsh1[k], hsh2[k])))
else:
yield (k, hsh2[k])
elif k in hsh1:
yield (k, hsh1[k])
else:
yield (k, hsh2[k])


class DataSourceFrameworkConfig:
"""
The configs that will be exposed to DataSource instances.
This abstraction prevents DataSource instances from having access to all configuration, while also
preventing them from requiring substantial changes to access new configs that may be added.
"""

def __init__(self, max_file_size):
"""
Should not be called directly. Use the Builder.
"""
self.max_file_size = max_file_size

class Builder:
def __init__(self):
self.max_file_size = DEFAULT_MAX_FILE_SIZE

def with_max_file_size(self, max_file_size):
self.max_file_size = max_file_size
return self

def build(self):
return DataSourceFrameworkConfig(self.max_file_size)
Loading