From d10e906d80930199e7e58e21b6c3120f85eebc0c Mon Sep 17 00:00:00 2001 From: Morten Kromberg Date: Sun, 28 Aug 2016 09:33:16 +0200 Subject: [PATCH] Executed first distributed command (Count) --- BootServers.dyalog | 16 ++- TestVecdbSrv.dyalog | 55 ++++++----- dev.dyapp | 1 + vecdb.dyalog | 2 +- vecdbboot.dws | Bin 41528 -> 41920 bytes vecdbclt.dyalog | 116 ++++++++++++++-------- vecdbslave.dyalog | 202 +++++++++++++++++++++++++++++++++++++ vecdbsrv.dyalog | 235 +++++++++++++++++++++++--------------------- 8 files changed, 442 insertions(+), 185 deletions(-) create mode 100644 vecdbslave.dyalog diff --git a/BootServers.dyalog b/BootServers.dyalog index 8fa3bb6..6a191c6 100644 --- a/BootServers.dyalog +++ b/BootServers.dyalog @@ -2,17 +2,22 @@ ⍝ Start a vecdb server process if VECDBSRV="config.json" PORT=nnnn ⍝ vecdb slave process if VECDBSLAVE="file" SHARDS="n" PORT=nnnn - getenv←{0=≢2 ⎕NQ'.' 'GetEnvironment'⍵:⍺} + getenv←{0=≢r←2 ⎕NQ'.' 'GetEnvironment'⍵:⍺ ⋄ r} getnum←{⊃2⊃⎕VFI ⍵} path←'file://',⊃⎕NPARTS ⎕WSID - VECDBSRV←0≠⍴CONFIG←''getenv'VECDBSRV' - VECDBSLAVE←0≠VECDB←''getenv'VECDBSLAVE' + VECDBSRV←0≠≢SRVDB←''getenv'VECDBSRV' + VECDBSLAVE←0≠≢VECDB←''getenv'VECDBSLAVE' + SHARDS←2⊃⎕VFI''getenv'SHARDS' + TOKEN←2⊃⎕VFI''getenv'TOKEN' + port←getnum''getenv'PORT' 2 ⎕FIX path,'APLProcess.dyalog' 2 ⎕FIX path,'vecdb.dyalog' + 2 ⎕FIX path,'vecdbclt.dyalog' 2 ⎕FIX path,'vecdbsrv.dyalog' + 2 ⎕FIX path,'vecdbslave.dyalog' :If 0=⎕NC'DRC' ⍝ Get conga if necessary 'DRC'⎕CY'conga'getenv'CONGAWS' @@ -24,10 +29,11 @@ ' ',2 ⎕FIX path,'TestVecdbSrv.dyalog' :Else + AUTOSHUT←1 {}1 ##.DRC.Init'' - :If VECDBSRV ⋄ vecdbsrv.Start CONFIG port - :ElseIf VECDBSLAVE ⋄ vecdbslave.Start VECDB port + :If VECDBSRV ⋄ vecdbsrv.Start SRVDB port + :ElseIf VECDBSLAVE ⋄ vecdbslave.Start VECDB SHARDS port :Else ⎕←'Invalid configuration...' :EndIf diff --git a/TestVecdbSrv.dyalog b/TestVecdbSrv.dyalog index 5de800f..fbe2c0c 100644 --- a/TestVecdbSrv.dyalog +++ b/TestVecdbSrv.dyalog @@ -11,7 +11,8 @@ :Trap 6 ⋄ source←SALT_Data.SourceFile :Else ⋄ source←⎕WSID :EndTrap - path←{(-⌊/(⌽⍵)⍳'\/')↓⍵}source + path←{(-⌊/(⌽⍵)⍳'\/')↓⍵}source + :If 0=⎕NC '#.DRC' ⋄ 'DRC' #.⎕CY 'conga' ⋄ :EndIf ⎕←ServerBasic ∇ @@ -24,28 +25,15 @@ vecdbsrv.Users←,user db←⎕NS'' db.Folder←folder - db.Slaves←,¨1 2 ⍝ Distribution of shards to slave processors + db.Slaves←⎕NS¨2⍴⊂'' + db.Slaves.Shards←,¨1 2 ⍝ Distribution of shards to slave processors config←⎕NS'' config.Server←vecdbsrv config.DBs←,db (toJson config)⎕NPUT filename ∇ - - ∇ z←ServerBasic;columns;data;options;params;folder;types;name;ix;users;srvproc;clt;TEST - ⍝ Test database with 2 shards - ⍝ Also acts as test for add/remove columns - - folder←path,'/',(name←'srvtest'),'/' - ⎕←'Clearing: ',folder - :Trap 22 ⋄ #.vecdb.Delete folder ⋄ :EndTrap - ⎕MKDIR folder - - ⍝ --- Create configuration file --- - - CreateTestConfig folder,'config.json' - - ⍝ --- Create database --- - + + ∇ (db params)←CreateTestDB;columns;types;data;options columns←'Name' 'BlockSize' 'Flag' types←,¨'C' 'F' 'C' data←('IBM' 'AAPL' 'MSFT' 'GOOG' 'DYALOG')(160.97 112.6 47.21 531.23 999.99)(5⍴'Buy' 'Sell') @@ -56,25 +44,38 @@ options.(ShardFn ShardCols)←'{2-2|⎕UCS ⊃¨⊃⍵}' 1 params←name folder columns types options data - TEST←'Create sharded database' db←⎕NEW #.vecdb params - assert (≢data)=db.Count + assert (≢⊃data)=db.Count + ∇ + + ∇ z←ServerBasic;columns;data;options;params;folder;types;name;ix;users;srvproc;clt;TEST;config;db + ⍝ Test database with 2 shards + ⍝ Also acts as test for add/remove columns + + folder←path,'/',(name←'srvtest'),'/' + ⎕←'Clearing: ',folder + :Trap 22 ⋄ {}#.vecdb.Delete folder ⋄ :EndTrap + ⎕MKDIR folder + + ⍝ --- Create configuration file --- + + config←CreateTestConfig folder,'config.json' + (db (name folder columns types options data))←CreateTestDB ⍝ --- Launch and connect to server, open database --- - srvproc←#.vecdbsrv.Launch folder 8100 - assert 0=srvproc.HasExited + srvproc←#.vecdbsrv.Launch folder 8100 + #.vecdbclt.Connect '127.0.0.1' 8100 'mkrom' + db←#.vecdbclt.Open folder - clt←#.vecdbclt.Connect '127.0.0.1' 8100 'mkrom' - db←clt.Open folder - + ∘∘∘ + assert (≢⊃data)=db.Count ix←db.Query('Name'((columns⍳⊂'Name')⊃data))⍬ ⍝ Should find everything assert(1 2,⍪⍳¨4 1)≡ix TEST←'Read it all back' assert data≡db.Read time ix columns - z←db.Close - clt.ShutDown 'Shutting down now!' + z←db.Shutdown 'Shutting down now!' ⎕DL 3 svrproc.Kill ⎕DL 3 diff --git a/dev.dyapp b/dev.dyapp index edd52ae..444feae 100644 --- a/dev.dyapp +++ b/dev.dyapp @@ -1,6 +1,7 @@ Load vecdb Load vecdbclt Load vecdbsrv +Load vecdbslave Load MakeBoot Load BootServers Load APLProcess diff --git a/vecdb.dyalog b/vecdb.dyalog index 8684f6b..0f82bac 100644 --- a/vecdb.dyalog +++ b/vecdb.dyalog @@ -52,7 +52,7 @@ :Property Count :Access public ∇ r←get - r←⊃+/_Counts.counter + r←⊃+/_Counts[ShardSelected].counter ∇ :EndProperty diff --git a/vecdbboot.dws b/vecdbboot.dws index 74e0029a9143ca642fa481d8731a66735834f7aa..4276cb11ed575ee652d5b295dec7b0eefc975f2c 100644 GIT binary patch delta 3415 zcmZWrZA_cj6+ZVGh%q(6hy-H-7^m}G(oh2(^IX9UO{kl;34x3^?WQh#gqJXi9Uv%a z7O?q9_)4(35KKsdRjn#&tBurBBeeW$P zHG1B2?>Wyo_uO+oxbb)C?e}QrtGexUI~@ECP)Phj3*pRi-7h^o%tZDkt-8X=*myYU zgUWDnxFjp-KMpQmE)s@&p-?g_r2AsE#7Z(FQ65Epy$g)!8rUiwp=;-2O}b*e&AcFu zTPjuqr+$Ay}Gh|c}L&hcEhl{Mv z)W|t2LKip2-Z9w^lN|P0)9Zmi6lfqN~|f zfb%V5(Q{E|B~JRyws6oMi}baT9DkUPa(ar(&^a!@U?(KiQpKST6gfmr+HCQ{V`N#G6DC2Q7~KnJfQev4Q^{0%{g4ssC( z356!aO++50utW(wj6O6R+|8U3D~)OsrTQRikJxe6tbMzk#UmM(<$x9%VDU7U(IA)l zU^w_gmO58pagA_|Wbz&W#;Trcpn(A`aC8u>RUTVu9U;s=Z?-HCbK1&zF1bym24-%S0NI z1TwO4(u#Jifv%yL&$q}VLvvOZZzq-BZTEIKyEi?<}K0nv|2DK_lwvDikmoUp~& z2dnnO6RI8e=WM_*5 zPJ?rjtDF~E(Rq6c56b?MzDF;(|JDX&o?(6RkwZ`IKgf)Oxj9yG?Bn9tuiej`8r-d2 zSk!Zgxd#vDr)cq4g@ain5)n7jMk4s_K~E}(Jh;X>WYjfx^f2d;GxgXO%bx5+$dFL; z^Z{;9;&66T@A2lArtXf$-s8nBXBsJ4K>YjPIFddMN8Vm3VLy}66`HzGrj;~RFEUIht!9zOlbyYd9Vc3>d^eryYV0}T>ntuVW_QyarGDY^ z)&KZUC!B~A5(gkO%N2KpY`eRy|rnVTO&N@M>Zg`}?TP z;F#uo|1`6V*_kV1W`R+CPFq9_C&bb*-socg7%wnbFY}%Q2N|=mp|LFHA2m5-Cc-LQ zLHgF?!E1nhP@RYo7p#>Tj8S9oe(TH*jN6%a!n|iyR6vgQ{diue!kGJ@p1n4jR}vRF z%g9^#N8k`Vg~$J(Fnq;sj2Ap4N7>4*!plPQs0VMN1*{}epw)(2BW!BClEN(eqbBB9 z=0qinkDIA?mbFjR(6$J(OxXH6pb>iEcp~l}#3B+g3Kgp<2j5%;PKb+D0kM2?8Rerv zUNFlB&JPKm^QXM;KLhN~7mfXLiaajz zp&~COamM>$>*y=aUfJ&S}R6gzA9=hgnV*+H2IElzh!wIL-gO=vQw_p z#H9(~|2>l5N8CYtfcP4bP5^Qc7K8&)j(8kVi#UvU4)JqD3*rStFQOkYf*3{2BVI2B$1Ii*61 zQH5Oi75sP(cGbr^%A8+LXo>V=PpmeE^xpyn-gGwPE;|KS(?Z~FyBxKLJ9g0fGXSqH?{+jPIB;`7kXMpUQLtFup zSC@gK5^4osUkbF&qxLFl`@0O#is~Qhd;^49OujT|y(Vmf!(G@zV>QNq%w3G(sSXnZ razpz*Vq|yD+L)d>GXo2_WAL{mq$e6Xz$yt{iQ>j%(doFJ+X4O$W~1ry delta 3229 zcmZWre@t7~6~6aDsBL15T@oB1U`XdUBx4vV<0kHKje&MlG&s7Dcr8V?eXo;hq$g9FXqLs?t>~#e|cw z?gC>M=j3$yt=ykc_AvlfjMH(3h{ zgA113EW-cVw>_}zEwwH1&U)M6WD!z))A#|Gi^S1XUIhU>|LyHr;7 zD*st+@DcX*e)iTloSJaLref9|R0V{%;rfKL;Ednbb6%+mUg))XPPIPU;KK(d^NRMJ z#RI>r&!T zXf=t7fGT%#t9lY@=G=ySZo00)Wi;2WG40wItj;w{p=)4BG)rqLm4Gi%VOZoZS|$Ri zgdLJet6`CcSd74OvI3ft1&~SU;o+R=(6O=YsR_B;$?Oy46nTa;lc&g&+Q3)v(P*Jz zlLyL=VxVS&b)1P8qhPphht05cI-!$7V^EWFOLxa1fYdn-Z>8LjnbVtfY@c`etWBJC z1D8Gq%c%-uFXh#OKc?`a)PwsoPahn0DlWx;bF=&EY|h^ zGcFPz4t~UMKzuP)`BjLFTSv$uxcHfe#MyHe?_-dc^qN7_50mQevWa~!F6pT$ng-@g8q*zo>ojLW_5Tz08!%p&TV$)DdT-bXY;ljTf z4ln(oqi}kzZ^4fw1gIiGs>vgi#HdJqt`}1XGM76@I@u;i>>Ibk> zAS;SqKG1Nne2>gKdqt-ZeT4QBbqT%(t4vRmeMZG9Uuh_k`-^4td3O0#NlKFhl%w?} z!?LnpE@u%-xAn_VE|q4ZoYJ2b`zgI$>NjK;+vM*PS`tT06T07o2`T-A_y==H58&Vo z4h{tjq(`BWG0Q(f>F><`T+%bkePPX~ZbeOQm&>ai0VRRXBu+de2XLAda~`d)lriuM zu9nb#wLNTvUtf6)&fTuI*y30N_;@&P#M5_fyQ$PSjpa1``V##DUd%0o>X6Q|4on)F zXPf~T$+`q9p<|WSSvEv2Gj?&igR^IbjR$Abl_TBsJNno4D;p+7`?(^`lPsbmd89!9 z9+A+4t#nIkw>+zw2V3bDcqSZ>3EQL%^~aXfG6$8TcKqc9FPQ8Q9L+D%&9P*QLB<>0 zZkdaZ9E4wvmK6nWGLKL)ix7+@z1%LXB-FqgN&LlDmtOCQQmBqti*z@bEdMpe(d&$H zQg_6ycO;m(3C>7ikt4;dBh zmLWvPL}WfmJ+x@M,ZI2,<->,ZI2,< >,ZI2,<:>,ZI2,<:>,ZI2' ⎕FMT 1 6⍴⍵} + + ∇ {r}←Shutdown dummy + ⍝ Shut down slave + + DB.Close ⍝ Close the vecdb + ⎕EX 'DB' + done←1 ⍝ Global flag to shut down + r←⍬ ⍝ Need a result + ∇ + + ∇ Init(folder shards) + STATE←1 ⍝ Starting, 0=Running, 2=Startup Failed, 3=Shut Down + 1 Log STATUS←'Startup initiated at ',fmtts ⎕TS + CONNS←TASKS←USERS←TOKENS←⍬ + NEXTTASK←1000 + + :Trap 0 + DB←⎕NEW ##.vecdb(folder shards) + STATE←0 + 1 Log'Slave startup completed, ',STATUS←'Folder= ',folder,', shards= ',⍕shards + :Else + STATE←2 ⍝ Startup Failed + 3 Log STATUS←'Startup failed: ',∊⎕DM + :EndTrap + ∇ + + ∇ {r}←Start(folder shards port);sink;data;event;obj;rc;wait;z;cmd;name + ⍝ Run a vecdb Slave - based on CONGA RPCServer sample + + {}##.DRC.Init'' + {}##.DRC.Close name←'VECSRV' + + Init folder shards + + :If 0=1⊃r←##.DRC.Srv name''port'Command' + 1 Log'Server ''',name,''', listening on port ',⍕port + 2 Log'Handler thread started: ',⍕Run&name port + :Else + 3 Log'Server failed to start: ',,⍕r + :EndIf + ∇ + + ∇ Connect cmd;task;conn + ⍝ Connection Created + + conn←1↓⊃(cmd='.')⊂cmd + CONNS,←⊂conn + TASKS,←task←NEXTTASK + NEXTTASK←10000|NEXTTASK+1 + USERS←USERS,0 + TOKENS←TOKENS,⊂'' + + 0 Log'New connection ',conn,' assigned task id ',⍕task + ∇ + + ∇ Disconnect obj;m;i;held;task;conn + ⍝ Connection Lost + + conn←1↓⊃(obj='.')⊂obj + 0 Log'Connection ',conn,' disconnected' + + :If (⍴m)≥i←(m←~CONNS∊⊂conn)⍳0 + CONNS←m/CONNS + TASKS←m/TASKS + USERS←m/USERS + TOKENS←m/TOKENS + :EndIf + ∇ + + ∇ level Log message + →(level,ZI2,<:>,ZI2,<.>,ZI3'⎕FMT 1 4⍴3↓⎕TS),' ',message + ∇ + + ∇ Process(obj data);r;CONNECTION;cmd;arg;close;txt + ⍝ Process a call. data[1] contains function name, data[2] an argument + + ⍝ {}##.DRC.Progress obj(' Thread ',(⍕⎕TID),' started to run: ',,⍕data) ⍝ Send progress report + CONNECTION←obj + Conn←1↓⊃(obj='.')⊂obj + (cmd arg)←2↑data + close←0 + + :If (⊂cmd)∊'SetToken' 'SetUser' 'Shutdown' + r←0 (⍎cmd,' obj arg') + + :ElseIf (⊂cmd)∊'Append' 'Count' 'Query' 'Update' 'Read' + :If 0≠≢(CONNS⍳⊂Conn)⊃TOKENS,⊂'' + :Trap 9999 + :If cmd≡'Count' ⋄ r←0 DB.Count + :Else ⋄ r←0 ((DB⍎cmd) arg) + :EndIf + :Else ⋄ r←⎕EN ⎕DM + :EndTrap + :Else + close←1 + r←999 ('No valid token provided for command ',⍕cmd arg) + :EndIf + + :Else + r←999 ('Unsupported command: ',cmd) + :EndIf + + {}##.DRC.Respond obj r + + :If close + ⍝ /// {{}##.DRC.Close ⍵⊣⎕DL 1}&Conn ⍝ Start thread which waits 1s then closes + :EndIf + ∇ + + + ∇ r←Run(name port);sink;data;event;obj;rc;wait;z;cmd + ⍝ Run the Lock Server - based on CONGA RPCServer sample + + :If 0=⎕NC'start' ⋄ start←1 ⋄ :EndIf + {}##.DRC.Init'' + + 0 Log'Thread ',(⍕⎕TID),' is now handing server ''',name,'''.' + done←0 ⍝ done←1 in function "End" + :While ~done + rc obj event data←4↑wait←##.DRC.Wait name 3000 ⍝ Time out now and again + + :Select rc + :Case 0 + :Select event + :Case 'Error' + :If 1119≢data ⋄ 3 Log'Error ',(⍕data),' on ',obj ⋄ :EndIf + :If ~done∨←name≡obj ⍝ Error on the listener itself? + {}##.DRC.Close obj ⍝ Close connection in error + Disconnect obj ⍝ Let logic know + :EndIf + + :Case 'Receive' + :If 2≠⍴data ⍝ Command is expected to be (function name)(argument) + {}##.DRC.Respond obj(99999 'Bad command format') ⋄ :Leave + :EndIf + + Process obj data ⍝ NB Single-threaded + + :Case 'Connect' + Connect obj + + :Else ⍝ Unexpected result? + ∘ + :EndSelect + + :Case 100 ⍝ Time out - Insert code for housekeeping tasks here (deadlocks?) + + :Case 1010 ⍝ Object Not Found + 3 Log'Object ''',name,''' has been closed - RPC Server shutting down' ⋄ done←1 + + :Else + 3 Log'Error in RPC.Wait: ',⍕wait + :EndSelect + :EndWhile + ⎕DL 1 ⍝ Give responses time to complete + {}##.DRC.Close name + 0 Log'Server ',name,' terminated.' + + :If 2=⎕NC '#.AUTOSHUT' + :AndIf 0≠#.AUTOSHUT + ⎕OFF + :EndIf + ∇ + + ∇ task←SetUser(cmd User);i;Conn + ⍝ Return task ID + + Conn←1↓⊃(cmd='.')⊂cmd + + :If (⍴CONNS)