diff --git a/BootServers.dyalog b/BootServers.dyalog index 8fa3bb6..6a191c6 100644 --- a/BootServers.dyalog +++ b/BootServers.dyalog @@ -2,17 +2,22 @@ ⍝ Start a vecdb server process if VECDBSRV="config.json" PORT=nnnn ⍝ vecdb slave process if VECDBSLAVE="file" SHARDS="n" PORT=nnnn - getenv←{0=≢2 ⎕NQ'.' 'GetEnvironment'⍵:⍺} + getenv←{0=≢r←2 ⎕NQ'.' 'GetEnvironment'⍵:⍺ ⋄ r} getnum←{⊃2⊃⎕VFI ⍵} path←'file://',⊃⎕NPARTS ⎕WSID - VECDBSRV←0≠⍴CONFIG←''getenv'VECDBSRV' - VECDBSLAVE←0≠VECDB←''getenv'VECDBSLAVE' + VECDBSRV←0≠≢SRVDB←''getenv'VECDBSRV' + VECDBSLAVE←0≠≢VECDB←''getenv'VECDBSLAVE' + SHARDS←2⊃⎕VFI''getenv'SHARDS' + TOKEN←2⊃⎕VFI''getenv'TOKEN' + port←getnum''getenv'PORT' 2 ⎕FIX path,'APLProcess.dyalog' 2 ⎕FIX path,'vecdb.dyalog' + 2 ⎕FIX path,'vecdbclt.dyalog' 2 ⎕FIX path,'vecdbsrv.dyalog' + 2 ⎕FIX path,'vecdbslave.dyalog' :If 0=⎕NC'DRC' ⍝ Get conga if necessary 'DRC'⎕CY'conga'getenv'CONGAWS' @@ -24,10 +29,11 @@ ' ',2 ⎕FIX path,'TestVecdbSrv.dyalog' :Else + AUTOSHUT←1 {}1 ##.DRC.Init'' - :If VECDBSRV ⋄ vecdbsrv.Start CONFIG port - :ElseIf VECDBSLAVE ⋄ vecdbslave.Start VECDB port + :If VECDBSRV ⋄ vecdbsrv.Start SRVDB port + :ElseIf VECDBSLAVE ⋄ vecdbslave.Start VECDB SHARDS port :Else ⎕←'Invalid configuration...' :EndIf diff --git a/TestVecdbSrv.dyalog b/TestVecdbSrv.dyalog index 5de800f..fbe2c0c 100644 --- a/TestVecdbSrv.dyalog +++ b/TestVecdbSrv.dyalog @@ -11,7 +11,8 @@ :Trap 6 ⋄ source←SALT_Data.SourceFile :Else ⋄ source←⎕WSID :EndTrap - path←{(-⌊/(⌽⍵)⍳'\/')↓⍵}source + path←{(-⌊/(⌽⍵)⍳'\/')↓⍵}source + :If 0=⎕NC '#.DRC' ⋄ 'DRC' #.⎕CY 'conga' ⋄ :EndIf ⎕←ServerBasic ∇ @@ -24,28 +25,15 @@ vecdbsrv.Users←,user db←⎕NS'' db.Folder←folder - db.Slaves←,¨1 2 ⍝ Distribution of shards to slave processors + db.Slaves←⎕NS¨2⍴⊂'' + db.Slaves.Shards←,¨1 2 ⍝ Distribution of shards to slave processors config←⎕NS'' config.Server←vecdbsrv config.DBs←,db (toJson config)⎕NPUT filename ∇ - - ∇ z←ServerBasic;columns;data;options;params;folder;types;name;ix;users;srvproc;clt;TEST - ⍝ Test database with 2 shards - ⍝ Also acts as test for add/remove columns - - folder←path,'/',(name←'srvtest'),'/' - ⎕←'Clearing: ',folder - :Trap 22 ⋄ #.vecdb.Delete folder ⋄ :EndTrap - ⎕MKDIR folder - - ⍝ --- Create configuration file --- - - CreateTestConfig folder,'config.json' - - ⍝ --- Create database --- - + + ∇ (db params)←CreateTestDB;columns;types;data;options columns←'Name' 'BlockSize' 'Flag' types←,¨'C' 'F' 'C' data←('IBM' 'AAPL' 'MSFT' 'GOOG' 'DYALOG')(160.97 112.6 47.21 531.23 999.99)(5⍴'Buy' 'Sell') @@ -56,25 +44,38 @@ options.(ShardFn ShardCols)←'{2-2|⎕UCS ⊃¨⊃⍵}' 1 params←name folder columns types options data - TEST←'Create sharded database' db←⎕NEW #.vecdb params - assert (≢data)=db.Count + assert (≢⊃data)=db.Count + ∇ + + ∇ z←ServerBasic;columns;data;options;params;folder;types;name;ix;users;srvproc;clt;TEST;config;db + ⍝ Test database with 2 shards + ⍝ Also acts as test for add/remove columns + + folder←path,'/',(name←'srvtest'),'/' + ⎕←'Clearing: ',folder + :Trap 22 ⋄ {}#.vecdb.Delete folder ⋄ :EndTrap + ⎕MKDIR folder + + ⍝ --- Create configuration file --- + + config←CreateTestConfig folder,'config.json' + (db (name folder columns types options data))←CreateTestDB ⍝ --- Launch and connect to server, open database --- - srvproc←#.vecdbsrv.Launch folder 8100 - assert 0=srvproc.HasExited + srvproc←#.vecdbsrv.Launch folder 8100 + #.vecdbclt.Connect '127.0.0.1' 8100 'mkrom' + db←#.vecdbclt.Open folder - clt←#.vecdbclt.Connect '127.0.0.1' 8100 'mkrom' - db←clt.Open folder - + ∘∘∘ + assert (≢⊃data)=db.Count ix←db.Query('Name'((columns⍳⊂'Name')⊃data))⍬ ⍝ Should find everything assert(1 2,⍪⍳¨4 1)≡ix TEST←'Read it all back' assert data≡db.Read time ix columns - z←db.Close - clt.ShutDown 'Shutting down now!' + z←db.Shutdown 'Shutting down now!' ⎕DL 3 svrproc.Kill ⎕DL 3 diff --git a/dev.dyapp b/dev.dyapp index edd52ae..444feae 100644 --- a/dev.dyapp +++ b/dev.dyapp @@ -1,6 +1,7 @@ Load vecdb Load vecdbclt Load vecdbsrv +Load vecdbslave Load MakeBoot Load BootServers Load APLProcess diff --git a/vecdb.dyalog b/vecdb.dyalog index 8684f6b..0f82bac 100644 --- a/vecdb.dyalog +++ b/vecdb.dyalog @@ -52,7 +52,7 @@ :Property Count :Access public ∇ r←get - r←⊃+/_Counts.counter + r←⊃+/_Counts[ShardSelected].counter ∇ :EndProperty diff --git a/vecdbboot.dws b/vecdbboot.dws index 74e0029..4276cb1 100644 Binary files a/vecdbboot.dws and b/vecdbboot.dws differ diff --git a/vecdbclt.dyalog b/vecdbclt.dyalog index b169161..4a9c8bd 100644 --- a/vecdbclt.dyalog +++ b/vecdbclt.dyalog @@ -1,16 +1,31 @@ :Namespace vecdbclt - (⎕IO ⎕ML ⎕WX)←1 0 3 + (⎕IO ⎕ML)←1 1 SERVER←'' - ∇ r←Connect(address port user) + ∇ r←Clt(connection address port) + :If 1111=⊃r←##.DRC.Clt connection address port + {}⎕DL 0.5 + :AndIf 1111=⊃r←##.DRC.Clt connection address port + {}⎕DL 1 + :AndIf 1111=⊃r←##.DRC.Clt connection address port + {}⎕DL 3 + :AndIf 1111=⊃r←##.DRC.Clt connection address port + {}⎕DL 5 + :AndIf 1111=⊃r←##.DRC.Clt connection address port + (⍕r)⎕SIGNAL 11 + :EndIf + ∇ + + ∇ {r}←{connection}Connect(address port user) ⍝ Connect to vecdb server process + :If 0=⎕NC'connection' ⋄ connection←'VECDB' ⋄ :EndIf + :If 0=⊃r←##.DRC.Init'' - {}##.DRC.Close CONNECTION←'VECDB' - :AndIf 0=⊃r←##.DRC.Clt CONNECTION address port - :AndIf 0=⊃r←SrvDo CONNECTION('CltSetUser'user) - r←SrvDo CONNECTION('CltSetUser'user) + :If 0≠⍴connection ⋄ {}##.DRC.Close connection ⋄ :EndIf + :AndIf 0=⊃r←Clt connection address port + CONNECTION←2⊃r :Else ('Error: ',,⍕r)⎕SIGNAL 11 :EndIf @@ -44,45 +59,66 @@ :EndIf ∇ - ∇ r←Open database + ∇ r←Open folder ⍝ Cover-function for call to Lock from a Client - :If 0=⊃r←SrvDo CONNECTION ('CltOpen' database) - r←⎕NEW vecdbproxy (2⊃r) - :Else - (,⍕r) ⎕SIGNAL 11 - :EndIf - ∇ - + r←⎕NEW vecdbproxy(folder CONNECTION) + ∇ + :Class vecdbproxy ⍝ Produce a vecdb proxy object for a served vecdb - - ∇Open (name connection) - :Access Public - :Implements Constructor - (NAME CONNECTION)←name connection - ∇ - - ∇Close - :Access Public - - ∇ - - ∇r←Append args - :Access Public -∇ - - ∇r←Query args - :Access Public - ∇ - - ∇r←Read args - :Access Public - ∇ - - ∇r←Update args - ∇ + ∇ Open(folder connection) + :Access Public + :Implements Constructor + (FOLDER CONNECTION)←folder connection + :If 0=⊃r←##.SrvDo CONNECTION('Open'folder) + ⎕DF'[vecdbclt: ',folder,']' + :Else + (⍕r)⎕SIGNAL 11 + :EndIf + ∇ + + ∇ Shutdown msg + :Access Public + :If 0=⊃r←##.SrvDo CONNECTION('Shutdown' msg) + {}#.DRC.Close CONNECTION + CONNECTION←'' + :EndIf + ∇ + + ∇ Close + :Access Public + :If 0=⊃r←##.SrvDo CONNECTION('Close' ⍬) + {}#.DRC.Close CONNECTION + CONNECTION←'' + :EndIf + ∇ + + ∇ r←Count + :Access Public + :If 0≠⍴CONNECTION + r←##.SrvDo CONNECTION('Count' (FOLDER ⍬)) + r←+/r + :Else + 'CONNECTION CLOSED' ⎕SIGNAL 11 + :EndIf + ∇ + + ∇ r←Append args + :Access Public + ∇ + + ∇ r←Query args + :Access Public + ∇ + + ∇ r←Read args + :Access Public + ∇ + + ∇ r←Update args + ∇ :EndClass diff --git a/vecdbslave.dyalog b/vecdbslave.dyalog new file mode 100644 index 0000000..6b07fa7 --- /dev/null +++ b/vecdbslave.dyalog @@ -0,0 +1,202 @@ +:Namespace vecdbslave + + (⎕IO ⎕ML)←1 1 + LOGLEVEL←0 + + fmtts←{,'ZI4,<->,ZI2,<->,ZI2,< >,ZI2,<:>,ZI2,<:>,ZI2' ⎕FMT 1 6⍴⍵} + + ∇ {r}←Shutdown dummy + ⍝ Shut down slave + + DB.Close ⍝ Close the vecdb + ⎕EX 'DB' + done←1 ⍝ Global flag to shut down + r←⍬ ⍝ Need a result + ∇ + + ∇ Init(folder shards) + STATE←1 ⍝ Starting, 0=Running, 2=Startup Failed, 3=Shut Down + 1 Log STATUS←'Startup initiated at ',fmtts ⎕TS + CONNS←TASKS←USERS←TOKENS←⍬ + NEXTTASK←1000 + + :Trap 0 + DB←⎕NEW ##.vecdb(folder shards) + STATE←0 + 1 Log'Slave startup completed, ',STATUS←'Folder= ',folder,', shards= ',⍕shards + :Else + STATE←2 ⍝ Startup Failed + 3 Log STATUS←'Startup failed: ',∊⎕DM + :EndTrap + ∇ + + ∇ {r}←Start(folder shards port);sink;data;event;obj;rc;wait;z;cmd;name + ⍝ Run a vecdb Slave - based on CONGA RPCServer sample + + {}##.DRC.Init'' + {}##.DRC.Close name←'VECSRV' + + Init folder shards + + :If 0=1⊃r←##.DRC.Srv name''port'Command' + 1 Log'Server ''',name,''', listening on port ',⍕port + 2 Log'Handler thread started: ',⍕Run&name port + :Else + 3 Log'Server failed to start: ',,⍕r + :EndIf + ∇ + + ∇ Connect cmd;task;conn + ⍝ Connection Created + + conn←1↓⊃(cmd='.')⊂cmd + CONNS,←⊂conn + TASKS,←task←NEXTTASK + NEXTTASK←10000|NEXTTASK+1 + USERS←USERS,0 + TOKENS←TOKENS,⊂'' + + 0 Log'New connection ',conn,' assigned task id ',⍕task + ∇ + + ∇ Disconnect obj;m;i;held;task;conn + ⍝ Connection Lost + + conn←1↓⊃(obj='.')⊂obj + 0 Log'Connection ',conn,' disconnected' + + :If (⍴m)≥i←(m←~CONNS∊⊂conn)⍳0 + CONNS←m/CONNS + TASKS←m/TASKS + USERS←m/USERS + TOKENS←m/TOKENS + :EndIf + ∇ + + ∇ level Log message + →(level,ZI2,<:>,ZI2,<.>,ZI3'⎕FMT 1 4⍴3↓⎕TS),' ',message + ∇ + + ∇ Process(obj data);r;CONNECTION;cmd;arg;close;txt + ⍝ Process a call. data[1] contains function name, data[2] an argument + + ⍝ {}##.DRC.Progress obj(' Thread ',(⍕⎕TID),' started to run: ',,⍕data) ⍝ Send progress report + CONNECTION←obj + Conn←1↓⊃(obj='.')⊂obj + (cmd arg)←2↑data + close←0 + + :If (⊂cmd)∊'SetToken' 'SetUser' 'Shutdown' + r←0 (⍎cmd,' obj arg') + + :ElseIf (⊂cmd)∊'Append' 'Count' 'Query' 'Update' 'Read' + :If 0≠≢(CONNS⍳⊂Conn)⊃TOKENS,⊂'' + :Trap 9999 + :If cmd≡'Count' ⋄ r←0 DB.Count + :Else ⋄ r←0 ((DB⍎cmd) arg) + :EndIf + :Else ⋄ r←⎕EN ⎕DM + :EndTrap + :Else + close←1 + r←999 ('No valid token provided for command ',⍕cmd arg) + :EndIf + + :Else + r←999 ('Unsupported command: ',cmd) + :EndIf + + {}##.DRC.Respond obj r + + :If close + ⍝ /// {{}##.DRC.Close ⍵⊣⎕DL 1}&Conn ⍝ Start thread which waits 1s then closes + :EndIf + ∇ + + + ∇ r←Run(name port);sink;data;event;obj;rc;wait;z;cmd + ⍝ Run the Lock Server - based on CONGA RPCServer sample + + :If 0=⎕NC'start' ⋄ start←1 ⋄ :EndIf + {}##.DRC.Init'' + + 0 Log'Thread ',(⍕⎕TID),' is now handing server ''',name,'''.' + done←0 ⍝ done←1 in function "End" + :While ~done + rc obj event data←4↑wait←##.DRC.Wait name 3000 ⍝ Time out now and again + + :Select rc + :Case 0 + :Select event + :Case 'Error' + :If 1119≢data ⋄ 3 Log'Error ',(⍕data),' on ',obj ⋄ :EndIf + :If ~done∨←name≡obj ⍝ Error on the listener itself? + {}##.DRC.Close obj ⍝ Close connection in error + Disconnect obj ⍝ Let logic know + :EndIf + + :Case 'Receive' + :If 2≠⍴data ⍝ Command is expected to be (function name)(argument) + {}##.DRC.Respond obj(99999 'Bad command format') ⋄ :Leave + :EndIf + + Process obj data ⍝ NB Single-threaded + + :Case 'Connect' + Connect obj + + :Else ⍝ Unexpected result? + ∘ + :EndSelect + + :Case 100 ⍝ Time out - Insert code for housekeeping tasks here (deadlocks?) + + :Case 1010 ⍝ Object Not Found + 3 Log'Object ''',name,''' has been closed - RPC Server shutting down' ⋄ done←1 + + :Else + 3 Log'Error in RPC.Wait: ',⍕wait + :EndSelect + :EndWhile + ⎕DL 1 ⍝ Give responses time to complete + {}##.DRC.Close name + 0 Log'Server ',name,' terminated.' + + :If 2=⎕NC '#.AUTOSHUT' + :AndIf 0≠#.AUTOSHUT + ⎕OFF + :EndIf + ∇ + + ∇ task←SetUser(cmd User);i;Conn + ⍝ Return task ID + + Conn←1↓⊃(cmd='.')⊂cmd + + :If (⍴CONNS)