Skip to content

Commit af2c685

Browse files
committed
Simplified the Mailbox/MailboxDir API
... by combining Mailbox::close(), Mailbox::deliver() and MailboxDir::open() into MailboxDir::deliver().
1 parent a5fc925 commit af2c685

File tree

5 files changed

+59
-81
lines changed

5 files changed

+59
-81
lines changed

include/Homa/Homa.h

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -305,27 +305,6 @@ class Mailbox {
305305
*/
306306
virtual ~Mailbox() = default;
307307

308-
/**
309-
* Signal that the caller will not access the mailbox after this call.
310-
* A mailbox will only be destroyed if it's removed from the directory
311-
* and closed by all openers.
312-
*
313-
* Not meant to be called by users.
314-
*
315-
* @sa MailboxDir::open()
316-
*/
317-
virtual void close() = 0;
318-
319-
/**
320-
* Used by a transport to deliver an ingress message to this mailbox.
321-
*
322-
* Not meant to be called by users.
323-
*
324-
* @param message
325-
* An ingress message just completed by the transport.
326-
*/
327-
virtual void deliver(InMessage* message) = 0;
328-
329308
/**
330309
* Retrieve a message currently stored in the mailbox.
331310
*
@@ -382,17 +361,19 @@ class MailboxDir {
382361
virtual Mailbox* alloc(uint16_t port) = 0;
383362

384363
/**
385-
* Find and open the mailbox that matches the given port number. Once a
386-
* mailbox is opened, it's guaranteed to remain usable even if someone else
387-
* removes it from the directory.
364+
* Used by a transport to deliver an ingress message to a mailbox.
365+
*
366+
* Not meant to be called by users.
388367
*
389368
* @param port
390369
* Port number which identifies the mailbox.
370+
* @param message
371+
* An ingress message just completed by the transport.
391372
* @return
392-
* Pointer to the opened mailbox on success; nullptr, if the desired
373+
* True if the message is delivered successfully; false, if the target
393374
* mailbox doesn't exist.
394375
*/
395-
virtual Mailbox* open(uint16_t port) = 0;
376+
virtual bool deliver(uint16_t port, InMessage* message) = 0;
396377

397378
/**
398379
* Remove the mailbox that matches the given port number.

include/Homa/Utils/SimpleMailboxDir.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ class SimpleMailboxDir final : public MailboxDir {
4747
explicit SimpleMailboxDir();
4848
~SimpleMailboxDir() override;
4949
Mailbox* alloc(uint16_t port) override;
50-
Mailbox* open(uint16_t port) override;
50+
bool deliver(uint16_t port, Homa::InMessage* message) override;
5151
bool remove(uint16_t port) override;
5252

5353
private:
5454
/// Monitor-style lock.
5555
std::unique_ptr<SpinLock> mutex;
5656

57-
/// Hash table that maps port numbers to mailboxes.
57+
/// Hash table that maps port numbers to mailboxes. Protected by mutex.
5858
std::unordered_map<uint16_t, MailboxImpl*> map;
5959
};
6060

src/Receiver.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,8 @@ Receiver::handleDataPacket(Driver::Packet* packet, IpAddress sourceIp)
165165
message->setState(Message::State::COMPLETED);
166166
bucket->resendTimeouts.cancelTimeout(&message->resendTimeout);
167167
uint16_t dport = be16toh(header->common.prefix.dport);
168-
Mailbox* mailbox = mailboxDir->open(dport);
169-
if (mailbox) {
170-
mailbox->deliver(message);
171-
mailbox->close();
172-
} else {
168+
bool success = mailboxDir->deliver(dport, message);
169+
if (!success) {
173170
lock_bucket.destroy();
174171
ERROR("Unable to deliver the message; message dropped");
175172
dropMessage(message);

src/Shenango.cc

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,6 @@ using namespace Homa;
2626
#define DECLARE_SHENANGO_FUNC(ReturnType, MethodName, ...) \
2727
extern ReturnType (*shenango_##MethodName)(__VA_ARGS__);
2828

29-
/**
30-
* Fast thread-local slab-based memory allocation.
31-
*/
32-
DECLARE_SHENANGO_FUNC(void*, smalloc, size_t)
33-
DECLARE_SHENANGO_FUNC(void, sfree, void*)
34-
3529
/**
3630
* Protect RCU read-side critical sections.
3731
*/
@@ -165,30 +159,13 @@ homa_driver_free(homa_driver drv)
165159
}
166160

167161
/**
168-
* An almost trivial implementation of Mailbox. This class is essentially
169-
* a wrapper around a socket table entry in Shenango (i.e., struct trans_entry).
170-
*
162+
* A trivial implementation of Mailbox for catching errors.
171163
*/
172164
class ShenangoMailbox final : public Mailbox {
173165
public:
174-
explicit ShenangoMailbox(void* trans_entry)
175-
: trans_entry(trans_entry)
176-
{}
177-
166+
explicit ShenangoMailbox() = default;
178167
~ShenangoMailbox() override = default;
179168

180-
void close() override
181-
{
182-
this->~ShenangoMailbox();
183-
shenango_sfree(this);
184-
shenango_rcu_read_unlock();
185-
}
186-
187-
void deliver(InMessage* message) override
188-
{
189-
shenango_homa_mb_deliver(trans_entry, homa_inmsg{message});
190-
}
191-
192169
InMessage* retrieve(bool blocking) override
193170
{
194171
(void)blocking;
@@ -199,10 +176,6 @@ class ShenangoMailbox final : public Mailbox {
199176
{
200177
PANIC("Shenango should never call Homa::Socket::shutdown");
201178
}
202-
203-
private:
204-
/// An opaque pointer to "struct trans_entry" in Shenango.
205-
void* const trans_entry;
206179
};
207180

208181
/**
@@ -226,21 +199,22 @@ class ShenangoMailboxDir final : MailboxDir {
226199
{
227200
// Shenango doesn't rely on Homa::Socket to receive messages,
228201
// so there is no need to assign a real mailbox to SocketImpl.
229-
static ShenangoMailbox dummyMailbox(nullptr);
202+
static ShenangoMailbox dummyMailbox;
230203
(void)port;
231204
return &dummyMailbox;
232205
}
233206

234-
Mailbox* open(uint16_t port) override
207+
bool deliver(uint16_t port, InMessage* message) override
235208
{
236-
SocketAddress laddr = {local_ip, port};
209+
// The socket table in Shenango is protected by an RCU.
237210
shenango_rcu_read_lock();
211+
SocketAddress laddr = {local_ip, port};
238212
void* trans_entry = shenango_trans_table_lookup(proto, laddr, {});
239-
if (!trans_entry) {
240-
return nullptr;
213+
if (trans_entry) {
214+
shenango_homa_mb_deliver(trans_entry, homa_inmsg{message});
241215
}
242-
void* backing = shenango_smalloc(sizeof(ShenangoMailbox));
243-
return new (backing) ShenangoMailbox(trans_entry);
216+
shenango_rcu_read_unlock();
217+
return trans_entry != nullptr;
244218
}
245219

246220
bool remove(uint16_t port) override

src/SimpleMailboxDir.cc

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ class MailboxImpl : public Mailbox {
2727
public:
2828
explicit MailboxImpl();
2929
~MailboxImpl() override;
30-
void close() override;
31-
void deliver(InMessage* message) override;
30+
void open();
31+
void close();
32+
void deliver(InMessage* message);
3233
InMessage* retrieve(bool blocking) override;
3334
void socketShutdown() override;
3435

@@ -63,7 +64,24 @@ MailboxImpl::~MailboxImpl()
6364
}
6465
}
6566

66-
/// See Homa::Mailbox::close()
67+
/**
68+
* Signal that the caller will be accessing the mailbox until close() is called.
69+
* Once a mailbox is opened, it's guaranteed to remain usable even if someone
70+
* else removes it from the directory.
71+
*/
72+
void
73+
MailboxImpl::open()
74+
{
75+
// Increment the reference count of the mailbox, so this mailbox won't be
76+
// deleted even if it's removed from the hash table.
77+
openers.fetch_add(1, std::memory_order_relaxed);
78+
}
79+
80+
/**
81+
* Signal that the caller will not access the mailbox after this call.
82+
* A mailbox will only be destroyed if it's removed from the directory
83+
* and closed by all openers.
84+
*/
6785
void
6886
MailboxImpl::close()
6987
{
@@ -75,7 +93,12 @@ MailboxImpl::close()
7593
}
7694
}
7795

78-
/// See Homa::Mailbox::deliver()
96+
/**
97+
* Deliver an ingress message to this mailbox.
98+
*
99+
* @param message
100+
* An ingress message just completed by the transport.
101+
*/
79102
void
80103
MailboxImpl::deliver(InMessage* message)
81104
{
@@ -131,24 +154,27 @@ SimpleMailboxDir::alloc(uint16_t port)
131154
return mailbox;
132155
}
133156

134-
Mailbox*
135-
SimpleMailboxDir::open(uint16_t port)
157+
bool
158+
SimpleMailboxDir::deliver(uint16_t port, Homa::InMessage* message)
136159
{
160+
// Find the mailbox.
137161
MailboxImpl* mailbox = nullptr;
138162
{
139-
// Look up the mailbox
140163
SpinLock::Lock _(*mutex);
141164
auto it = map.find(port);
142165
if (it != map.end()) {
143166
mailbox = it->second;
144167
}
168+
if (mailbox == nullptr) {
169+
return false;
170+
}
145171
}
146172

147-
// Increment the reference count of the mailbox.
148-
if (mailbox) {
149-
mailbox->openers.fetch_add(1, std::memory_order_relaxed);
150-
}
151-
return mailbox;
173+
// Deliver the message.
174+
mailbox->open();
175+
mailbox->deliver(message);
176+
mailbox->close();
177+
return true;
152178
}
153179

154180
bool

0 commit comments

Comments
 (0)