29
29
import edu .berkeley .ground .model .usage .LineageGraphVersion ;
30
30
import edu .berkeley .ground .model .versions .GroundType ;
31
31
import edu .berkeley .ground .util .IdGenerator ;
32
+ import org .slf4j .Logger ;
33
+ import org .slf4j .LoggerFactory ;
32
34
33
35
import java .util .ArrayList ;
36
+ import java .util .HashSet ;
34
37
import java .util .List ;
35
38
import java .util .Map ;
36
-
37
- import org .slf4j .Logger ;
38
- import org .slf4j .LoggerFactory ;
39
+ import java .util .Set ;
40
+ import java .util .stream .Collectors ;
39
41
40
42
public class CassandraLineageGraphVersionFactory
41
43
extends CassandraRichVersionFactory <LineageGraphVersion >
@@ -104,13 +106,12 @@ public LineageGraphVersion create(Map<String, Tag> tags,
104
106
this .dbClient .insert ("lineage_graph_version" , insertions );
105
107
106
108
for (long lineageEdgeVersionId : lineageEdgeVersionIds ) {
107
- List <DbDataContainer > lineageEdgeInsertion = new ArrayList <>();
108
- lineageEdgeInsertion .add (new DbDataContainer ("lineage_graph_version_id" , GroundType .LONG ,
109
- id ));
110
- lineageEdgeInsertion .add (new DbDataContainer ("lineage_edge_version_id" , GroundType .LONG ,
111
- lineageEdgeVersionId ));
109
+ List <DbDataContainer > predicates = new ArrayList <>();
110
+ predicates .add (new DbDataContainer ("id" , GroundType .LONG , id ));
111
+ Set <Long > edgeValue = new HashSet <>();
112
+ edgeValue .add (lineageEdgeVersionId );
112
113
113
- this .dbClient .insert ( "lineage_graph_version_edge " , lineageEdgeInsertion );
114
+ this .dbClient .addToSet ( "lineage_graph_version " , "lineage_edge_version_id_set" , edgeValue , predicates );
114
115
}
115
116
116
117
this .lineageGraphFactory .update (lineageGraphId , id , parentIds );
@@ -137,8 +138,8 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException
137
138
List <DbDataContainer > predicates = new ArrayList <>();
138
139
predicates .add (new DbDataContainer ("id" , GroundType .LONG , id ));
139
140
140
- List <DbDataContainer > lineageEdgePredicate = new ArrayList <>();
141
- lineageEdgePredicate .add (new DbDataContainer ("lineage_graph_version_id " , GroundType .LONG ,
141
+ List <DbDataContainer > lineageGraphVersionPredicates = new ArrayList <>();
142
+ lineageGraphVersionPredicates .add (new DbDataContainer ("id " , GroundType .LONG ,
142
143
id ));
143
144
144
145
CassandraResults resultSet = this .dbClient .equalitySelect ("lineage_graph_version" ,
@@ -148,15 +149,7 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException
148
149
149
150
long lineageGraphId = resultSet .getLong ("lineage_graph_id" );
150
151
151
- List <Long > lineageEdgeVersionIds = new ArrayList <>();
152
- CassandraResults lineageEdgeSet = this .dbClient .equalitySelect ("lineage_graph_version_edge" ,
153
- DbClient .SELECT_STAR , lineageEdgePredicate );
154
-
155
- if (!lineageEdgeSet .isEmpty ()) {
156
- do {
157
- lineageEdgeVersionIds .add (lineageEdgeSet .getLong ("lineage_edge_version_id" ));
158
- } while (lineageEdgeSet .next ());
159
- }
152
+ List <Long > lineageEdgeVersionIds = resultSet .getSet ("lineage_edge_version_id_set" , Long .class ).stream ().collect (Collectors .toList ());
160
153
161
154
LOGGER .info ("Retrieved lineage_graph version " + id + " in lineage_graph " + lineageGraphId
162
155
+ "." );
0 commit comments