1
+ /**
2
+ * Author Krisjan Oldekamp / Stacktonic.com
3
+
4
+ * Article https://stacktonic.com/article/create-a-fast-user-profile-and-recommender-service-using-big-query-redis-and-gtm-server
5
+ * Based on https://futurice.com/blog/bigquery-to-memorystore
6
+ */
7
+
8
+ 'use strict' ;
9
+
10
+ const Redis = require ( 'ioredis' ) ;
11
+ const { Storage} = require ( '@google-cloud/storage' ) ;
12
+ const split = require ( 'split' ) ;
13
+
14
+ const REDISHOST = process . env . REDISHOST || 'localhost' ;
15
+ const REDISPORT = process . env . REDISPORT || 6379 ;
16
+ const EXPIRATION = process . env . EXPIRATION || 259200 ; // Expiration of records in seconds
17
+ const FILE_PATH_OUTBOUND = process . env . FILE_PATH_OUTBOUND || 'outbound' ;
18
+ const FILE_PATH_PROCESSED = process . env . FILE_PATH_PROCESSED || 'processed' ;
19
+ const FILE_PREFIX = process . env . FILE_PREFIX || '' ;
20
+
21
+ const redisClient = new Redis ( {
22
+ host : REDISHOST ,
23
+ port : REDISPORT ,
24
+ } ) ;
25
+
26
+ /**
27
+ * Triggered from a change to a Cloud Storage bucket.
28
+ *
29
+ * @param {!Object } event Event payload.
30
+ * @param {!Object } context Metadata for the event.
31
+ */
32
+ exports . loadCloudStorageToRedis = async ( info , context ) => {
33
+
34
+ const path = info . name . split ( '/' )
35
+ const fileName = path [ path . length - 1 ]
36
+
37
+ if ( info . metageneration === '1' && info . name . startsWith ( FILE_PATH_OUTBOUND ) && fileName . startsWith ( FILE_PREFIX ) ) {
38
+
39
+ console . log ( `New file upload: gs://${ info . bucket } /${ info . name } ` )
40
+
41
+ const storage = new Storage ( )
42
+ const bucket = storage . bucket ( info . bucket ) ;
43
+ const file = bucket . file ( info . name ) ;
44
+
45
+ let keysWritten = 0 ;
46
+
47
+ try {
48
+
49
+ // Read file and send to Redis
50
+ file . createReadStream ( )
51
+ . on ( 'error' , error => reject ( error ) )
52
+ . on ( 'response' , ( response ) => {
53
+ // connection to GCS opened
54
+ } ) . pipe ( split ( ) )
55
+ . on ( 'data' , function ( record ) {
56
+ if ( ! record || record === "" ) return ;
57
+ keysWritten ++ ;
58
+ const data = JSON . parse ( record ) ;
59
+ redisClient . set ( data . key , record , 'EX' , EXPIRATION ) ;
60
+ } )
61
+ . on ( 'end' , ( ) => {
62
+ console . log ( `Successfully written ${ keysWritten } keys to Memcache Redis.` ) ;
63
+
64
+ // Move file to processed folder
65
+ bucket . file ( info . name ) . move ( FILE_PATH_PROCESSED + '/' + fileName )
66
+ console . log ( `File moved to: ${ info . bucket } /${ FILE_PATH_PROCESSED } /${ fileName } ` )
67
+
68
+ resolve ( ) ;
69
+ } )
70
+ . on ( 'error' , error => reject ( error ) ) ;
71
+
72
+ } catch ( e ) {
73
+ console . log ( `Error importing ${ fileName } to Redis: ${ e } ` )
74
+ }
75
+ }
76
+ } ;
0 commit comments