- 
                Notifications
    
You must be signed in to change notification settings  - Fork 43
 
[core] implement gcp storage #273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly expands the storage capabilities by integrating Google Cloud Storage (GCS). It provides a comprehensive set of features for managing objects in GCS, focusing on data integrity, performance for large files, and secure access mechanisms. The changes lay the groundwork for robust cloud storage interactions within the system.
Highlights
- New GCS Storage Provider: Introduces a new Google Cloud Storage (GCS) provider, enabling interaction with GCS buckets and objects.
 - Checksum Verification: Implements robust checksum verification using MD5 and CRC32C for ensuring data integrity during GCS operations.
 - Multipart and Parallel Operations: Adds support for multipart uploads (simulated via GCS composite objects) and parallel downloads/uploads for efficient handling of large files.
 - Presigned URLs: Provides functionality to generate and validate presigned URLs for temporary, secure access to GCS objects, including support for POST policies.
 - Dependency Updates: Updates Go module dependencies to include the necessary Google Cloud Storage SDK and related libraries.
 
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description | 
|---|---|---|
| Code Review | /gemini review | 
Performs a code review for the current pull request in its current state. | 
| Pull Request Summary | /gemini summary | 
Provides a summary of the current pull request in its current state. | 
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. | 
| Help | /gemini help | 
Displays a list of available commands. | 
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
- 
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive implementation for a Google Cloud Storage (GCS) provider, including functionality for integrity checks, multipart uploads, parallel operations, and presigned URLs. The code is well-structured across several new files. My review identifies several critical issues related to potential memory leaks, panics from unsafe type assertions, and buggy logic that could lead to data corruption or failed operations. I have also included suggestions to improve performance and code quality by using standard library features and optimizing file handling.
| var activeUploads = struct { | ||
| sync.RWMutex | ||
| uploads map[string]*compositeUpload | ||
| }{ | ||
| uploads: make(map[string]*compositeUpload), | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a global variable activeUploads to track ongoing uploads can lead to a memory leak. If uploads are initiated but never completed or aborted (e.g., due to an application crash), the compositeUpload entries will remain in the map indefinitely. A better approach would be to store this map within the Provider struct, tying its lifecycle to the provider instance. Alternatively, you could implement a periodic cleanup mechanism for stale uploads.
| sortedParts := make([]compositePart, len(upload.parts)) | ||
| for _, part := range upload.parts { | ||
| if part.partNumber > 0 && part.partNumber <= len(sortedParts) { | ||
| sortedParts[part.partNumber-1] = part | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for sorting parts is buggy and can lead to incorrect behavior or panics. It assumes that part numbers are dense and start from 1, and that the number of parts is known. If part numbers are sparse (e.g., 1 and 1000), this will fail. You should sort the upload.parts slice directly using sort.Slice.
	// Sort parts by part number
	sort.Slice(upload.parts, func(i, j int) bool {
		return upload.parts[i].partNumber < upload.parts[j].partNumber
	})
	sortedParts := upload.parts| for key, value := range conditions { | ||
| switch key { | ||
| case "key": | ||
| objectKey = value.(string) | ||
| policyConditions = append(policyConditions, storage.ConditionStartsWith("key", objectKey)) | ||
| case "content-type": | ||
| // Content type condition - using starts-with condition | ||
| policyConditions = append(policyConditions, storage.ConditionStartsWith("content-type", value.(string))) | ||
| case "content-length-range": | ||
| if rangeVals, ok := value.([]int64); ok && len(rangeVals) == 2 { | ||
| policyConditions = append(policyConditions, storage.ConditionContentLengthRange(uint64(rangeVals[0]), uint64(rangeVals[1]))) | ||
| } | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type assertions for key and content-type on lines 127 and 131 are unsafe and will cause a panic if the conditions map contains a value of a different type. You should use the two-value value, ok form to safely check the type before using the value.
	for key, value := range conditions {
		switch key {
		case "key":
			if v, ok := value.(string); ok {
				objectKey = v
				policyConditions = append(policyConditions, storage.ConditionStartsWith("key", objectKey))
			}
		case "content-type":
			// Content type condition - using starts-with condition
			if v, ok := value.(string); ok {
				policyConditions = append(policyConditions, storage.ConditionStartsWith("content-type", v))
			}
		case "content-length-range":
			if rangeVals, ok := value.([]int64); ok && len(rangeVals) == 2 {
				policyConditions = append(policyConditions, storage.ConditionContentLengthRange(uint64(rangeVals[0]), uint64(rangeVals[1])))
			}
		}
	}| func (p *Provider) calculateFileChecksum(filePath string) (md5Hash []byte, crc32c uint32, err error) { | ||
| file, err := os.Open(filePath) | ||
| if err != nil { | ||
| return nil, 0, fmt.Errorf("failed to open file: %w", err) | ||
| } | ||
| defer file.Close() | ||
| 
               | 
          ||
| // Calculate MD5 | ||
| md5Hasher := md5.New() | ||
| if _, err := io.Copy(md5Hasher, file); err != nil { | ||
| return nil, 0, fmt.Errorf("failed to calculate MD5: %w", err) | ||
| } | ||
| md5Hash = md5Hasher.Sum(nil) | ||
| 
               | 
          ||
| // Reset file position for CRC32C calculation | ||
| if _, err := file.Seek(0, 0); err != nil { | ||
| return nil, 0, fmt.Errorf("failed to reset file position: %w", err) | ||
| } | ||
| 
               | 
          ||
| // Calculate CRC32C | ||
| table := crc32.MakeTable(crc32.Castagnoli) | ||
| crc32Hasher := crc32.New(table) | ||
| if _, err := io.Copy(crc32Hasher, file); err != nil { | ||
| return nil, 0, fmt.Errorf("failed to calculate CRC32C: %w", err) | ||
| } | ||
| crc32c = crc32Hasher.Sum32() | ||
| 
               | 
          ||
| return md5Hash, crc32c, nil | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function reads the file twice to calculate MD5 and CRC32C hashes, which is inefficient, especially for large files. You can calculate both hashes in a single pass by using io.MultiWriter to write to both hashers simultaneously.
func (p *Provider) calculateFileChecksum(filePath string) (md5Hash []byte, crc32c uint32, err error) {
	file, err := os.Open(filePath)
	if err != nil {
		return nil, 0, fmt.Errorf("failed to open file: %w", err)
	}
	defer file.Close()
	md5Hasher := md5.New()
	table := crc32.MakeTable(crc32.Castagnoli)
	crc32Hasher := crc32.New(table)
	// Use io.MultiWriter to calculate both hashes in a single pass.
	multiWriter := io.MultiWriter(md5Hasher, crc32Hasher)
	if _, err := io.Copy(multiWriter, file); err != nil {
		return nil, 0, fmt.Errorf("failed to calculate checksums: %w", err)
	}
	md5Hash = md5Hasher.Sum(nil)
	crc32c = crc32Hasher.Sum32()
	return md5Hash, crc32c, nil
}| } | ||
| 
               | 
          ||
| // Generate a unique upload ID | ||
| uploadID := fmt.Sprintf("%s-%d", objectName, time.Now().UnixNano()) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using time.Now().UnixNano() to generate a unique upload ID is not guaranteed to be unique, especially in a distributed or highly concurrent environment. This could lead to collisions and data corruption. It's safer to use a universally unique identifier (UUID).
| uploadID := fmt.Sprintf("%s-%d", objectName, time.Now().UnixNano()) | |
| uploadID := uuid.New().String() | 
| func bytesEqual(a, b []byte) bool { | ||
| if len(a) != len(b) { | ||
| return false | ||
| } | ||
| for i := range a { | ||
| if a[i] != b[i] { | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
| 
               | 
          ||
| // UploadPart uploads a single part of a multipart upload | ||
| func (p *Provider) UploadPart(ctx context.Context, uri string, uploadID string, partNumber int, data io.Reader, size int64) (string, error) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size parameter in this function signature is shadowed by a new size variable declared on line 90 (size, err := io.Copy(writer, data)). This means the input size is never used. You should either use the provided size (e.g., with io.CopyN) or remove it from the function signature if it's not needed.
| defer reader.Close() | ||
| 
               | 
          ||
| // Read the chunk data | ||
| data := make([]byte, chunk.size) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function allocates a new buffer for each chunk. Since a BufferPool is defined in this package, you could use it here to reduce memory allocations and garbage collection pressure, which can improve performance for downloads with many small chunks.
	// Get a buffer from the pool
	buf := BufferPool.Get().([]byte)
	defer BufferPool.Put(buf)
	// Read the chunk data
	data := buf[:chunk.size]
	n, err := io.ReadFull(reader, data)| type ParallelUploadOptions struct { | ||
| ChunkSize int64 | ||
| Parallelism int | ||
| OnProgress func(uploaded, total int64) | ||
| StorageClass string | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a64f46a    to
    36e9b22      
    Compare
  
    36e9b22    to
    4e43e0e      
    Compare
  
    
What type of PR is this?
/kind feature
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Does this PR introduce a user-facing change?