Skip to content

Commit b1e63d5

Browse files
authored
feat: use generic event publisher for mysql consumer (#88)
1 parent c577764 commit b1e63d5

File tree

2 files changed

+123
-121
lines changed

2 files changed

+123
-121
lines changed
+46-45
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,63 @@
11
CREATE TABLE IF NOT EXISTS courses (
2-
id CHAR(36) NOT NULL,
3-
name VARCHAR(255) NOT NULL,
4-
duration VARCHAR(255) NOT NULL,
5-
PRIMARY KEY (id)
2+
id CHAR(36) NOT NULL,
3+
name VARCHAR(255) NOT NULL,
4+
duration VARCHAR(255) NOT NULL,
5+
PRIMARY KEY (id)
66
)
7-
ENGINE = InnoDB
8-
DEFAULT CHARSET = utf8mb4
9-
COLLATE = utf8mb4_unicode_ci;
7+
ENGINE = InnoDB
8+
DEFAULT CHARSET = utf8mb4
9+
COLLATE = utf8mb4_unicode_ci;
1010

1111
CREATE TABLE IF NOT EXISTS courses_counter (
12-
id CHAR(36) NOT NULL,
13-
total INT NOT NULL,
14-
existing_courses JSON NOT NULL,
15-
PRIMARY KEY (id)
12+
id CHAR(36) NOT NULL,
13+
total INT NOT NULL,
14+
existing_courses JSON NOT NULL,
15+
PRIMARY KEY (id)
1616
)
17-
ENGINE = InnoDB
18-
DEFAULT CHARSET = utf8mb4
19-
COLLATE = utf8mb4_unicode_ci;
20-
INSERT IGNORE INTO courses_counter (id, total, existing_courses) VALUES ('efbaff16-8fcd-4689-9fc9-ec545d641c46', 0, '[]');
17+
ENGINE = InnoDB
18+
DEFAULT CHARSET = utf8mb4
19+
COLLATE = utf8mb4_unicode_ci;
20+
INSERT IGNORE INTO courses_counter (id, total, existing_courses)
21+
VALUES ('efbaff16-8fcd-4689-9fc9-ec545d641c46', 0, '[]');
2122

2223
CREATE TABLE IF NOT EXISTS steps (
23-
id CHAR(36) NOT NULL,
24-
title VARCHAR(155) NOT NULL,
25-
PRIMARY KEY (id)
24+
id CHAR(36) NOT NULL,
25+
title VARCHAR(155) NOT NULL,
26+
PRIMARY KEY (id)
2627
)
27-
ENGINE = InnoDB
28-
DEFAULT CHARSET = utf8mb4
29-
COLLATE = utf8mb4_unicode_ci;
28+
ENGINE = InnoDB
29+
DEFAULT CHARSET = utf8mb4
30+
COLLATE = utf8mb4_unicode_ci;
3031

3132
CREATE TABLE IF NOT EXISTS steps_challenges (
32-
id CHAR(36) NOT NULL,
33-
statement TEXT NOT NULL,
34-
PRIMARY KEY (id),
35-
CONSTRAINT fk_steps_challenges__step_id FOREIGN KEY (id) REFERENCES steps(id)
33+
id CHAR(36) NOT NULL,
34+
statement TEXT NOT NULL,
35+
PRIMARY KEY (id),
36+
CONSTRAINT fk_steps_challenges__step_id FOREIGN KEY (id) REFERENCES steps(id)
3637
)
37-
ENGINE = InnoDB
38-
DEFAULT CHARSET = utf8mb4
39-
COLLATE = utf8mb4_unicode_ci;
38+
ENGINE = InnoDB
39+
DEFAULT CHARSET = utf8mb4
40+
COLLATE = utf8mb4_unicode_ci;
4041

4142
CREATE TABLE IF NOT EXISTS steps_videos (
42-
id CHAR(36) NOT NULL,
43-
url VARCHAR(255) NOT NULL,
44-
text TEXT NOT NULL,
45-
PRIMARY KEY (id),
46-
CONSTRAINT fk_steps_video__step_id FOREIGN KEY (id) REFERENCES steps(id)
43+
id CHAR(36) NOT NULL,
44+
url VARCHAR(255) NOT NULL,
45+
text TEXT NOT NULL,
46+
PRIMARY KEY (id),
47+
CONSTRAINT fk_steps_video__step_id FOREIGN KEY (id) REFERENCES steps(id)
4748
)
48-
ENGINE = InnoDB
49-
DEFAULT CHARSET = utf8mb4
50-
COLLATE = utf8mb4_unicode_ci;
49+
ENGINE = InnoDB
50+
DEFAULT CHARSET = utf8mb4
51+
COLLATE = utf8mb4_unicode_ci;
5152

5253
CREATE TABLE IF NOT EXISTS domain_events (
53-
id CHAR(36) NOT NULL,
54-
aggregate_id CHAR(36) NOT NULL,
55-
name VARCHAR(255) NOT NULL,
56-
body JSON NOT NULL,
57-
occurred_on TIMESTAMP NOT NULL,
58-
PRIMARY KEY (id)
54+
id CHAR(36) NOT NULL,
55+
aggregate_id CHAR(36) NOT NULL,
56+
name VARCHAR(255) NOT NULL,
57+
body JSON NOT NULL,
58+
occurred_on TIMESTAMP NOT NULL,
59+
PRIMARY KEY (id)
5960
)
60-
ENGINE = InnoDB
61-
DEFAULT CHARSET = utf8mb4
62-
COLLATE = utf8mb4_unicode_ci;
61+
ENGINE = InnoDB
62+
DEFAULT CHARSET = utf8mb4
63+
COLLATE = utf8mb4_unicode_ci;

src/shared/main/tv/codely/shared/infrastructure/bus/event/mysql/MySqlDomainEventsConsumer.java

+77-76
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import org.springframework.beans.factory.annotation.Qualifier;
77
import tv.codely.shared.domain.Utils;
88
import tv.codely.shared.domain.bus.event.DomainEvent;
9+
import tv.codely.shared.domain.bus.event.EventBus;
910
import tv.codely.shared.infrastructure.bus.event.DomainEventsInformation;
10-
import tv.codely.shared.infrastructure.bus.event.spring.SpringApplicationEventBus;
1111

1212
import java.lang.reflect.InvocationTargetException;
1313
import java.lang.reflect.Method;
@@ -17,79 +17,80 @@
1717
import java.util.List;
1818

1919
public class MySqlDomainEventsConsumer {
20-
private final SessionFactory sessionFactory;
21-
private final DomainEventsInformation domainEventsInformation;
22-
private final SpringApplicationEventBus bus;
23-
private final Integer CHUNKS = 200;
24-
private Boolean shouldStop = false;
25-
26-
public MySqlDomainEventsConsumer(
27-
@Qualifier("mooc-session_factory") SessionFactory sessionFactory,
28-
DomainEventsInformation domainEventsInformation,
29-
SpringApplicationEventBus bus
30-
) {
31-
this.sessionFactory = sessionFactory;
32-
this.domainEventsInformation = domainEventsInformation;
33-
this.bus = bus;
34-
}
35-
36-
@Transactional
37-
public void consume() {
38-
while (!shouldStop) {
39-
NativeQuery query = sessionFactory.getCurrentSession().createNativeQuery(
40-
"SELECT * FROM domain_events ORDER BY occurred_on ASC LIMIT :chunk"
41-
);
42-
43-
query.setParameter("chunk", CHUNKS);
44-
45-
List<Object[]> events = query.list();
46-
47-
try {
48-
for (Object[] event : events) {
49-
executeSubscribers(
50-
(String) event[0],
51-
(String) event[1],
52-
(String) event[2],
53-
(String) event[3],
54-
(Timestamp) event[4]
55-
);
56-
}
57-
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
58-
e.printStackTrace();
59-
}
60-
61-
sessionFactory.getCurrentSession().clear();
62-
}
63-
}
64-
65-
public void stop() {
66-
shouldStop = true;
67-
}
68-
69-
private void executeSubscribers(
70-
String id, String aggregateId, String eventName, String body, Timestamp occurredOn
71-
) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
72-
73-
Class<? extends DomainEvent> domainEventClass = domainEventsInformation.forName(eventName);
74-
75-
DomainEvent nullInstance = domainEventClass.getConstructor().newInstance();
76-
77-
Method fromPrimitivesMethod = domainEventClass.getMethod(
78-
"fromPrimitives",
79-
String.class,
80-
HashMap.class,
81-
String.class,
82-
String.class
83-
);
84-
85-
Object domainEvent = fromPrimitivesMethod.invoke(
86-
nullInstance,
87-
aggregateId,
88-
Utils.jsonDecode(body),
89-
id,
90-
Utils.dateToString(occurredOn)
91-
);
92-
93-
bus.publish(Collections.singletonList((DomainEvent) domainEvent));
94-
}
20+
private final SessionFactory sessionFactory;
21+
private final DomainEventsInformation domainEventsInformation;
22+
private final EventBus bus;
23+
private final Integer CHUNKS = 200;
24+
private Boolean shouldStop = false;
25+
26+
public MySqlDomainEventsConsumer(
27+
@Qualifier("mooc-session_factory") SessionFactory sessionFactory,
28+
DomainEventsInformation domainEventsInformation,
29+
EventBus bus
30+
) {
31+
this.sessionFactory = sessionFactory;
32+
this.domainEventsInformation = domainEventsInformation;
33+
this.bus = bus;
34+
}
35+
36+
@Transactional
37+
public void consume() {
38+
while (!shouldStop) {
39+
NativeQuery query = sessionFactory.getCurrentSession().createNativeQuery(
40+
"SELECT * FROM domain_events ORDER BY occurred_on ASC LIMIT :chunk"
41+
);
42+
43+
query.setParameter("chunk", CHUNKS);
44+
45+
List<Object[]> events = query.list();
46+
47+
try {
48+
for (Object[] event : events) {
49+
executeSubscribers(
50+
(String) event[0],
51+
(String) event[1],
52+
(String) event[2],
53+
(String) event[3],
54+
(Timestamp) event[4]
55+
);
56+
}
57+
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException |
58+
InstantiationException e) {
59+
e.printStackTrace();
60+
}
61+
62+
sessionFactory.getCurrentSession().clear();
63+
}
64+
}
65+
66+
public void stop() {
67+
shouldStop = true;
68+
}
69+
70+
private void executeSubscribers(
71+
String id, String aggregateId, String eventName, String body, Timestamp occurredOn
72+
) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
73+
74+
Class<? extends DomainEvent> domainEventClass = domainEventsInformation.forName(eventName);
75+
76+
DomainEvent nullInstance = domainEventClass.getConstructor().newInstance();
77+
78+
Method fromPrimitivesMethod = domainEventClass.getMethod(
79+
"fromPrimitives",
80+
String.class,
81+
HashMap.class,
82+
String.class,
83+
String.class
84+
);
85+
86+
Object domainEvent = fromPrimitivesMethod.invoke(
87+
nullInstance,
88+
aggregateId,
89+
Utils.jsonDecode(body),
90+
id,
91+
Utils.dateToString(occurredOn)
92+
);
93+
94+
bus.publish(Collections.singletonList((DomainEvent) domainEvent));
95+
}
9596
}

0 commit comments

Comments
 (0)