Skip to content

Commit

Permalink
fix bitset::count() bug and improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Samy <[email protected]>
  • Loading branch information
asamy committed Aug 21, 2016
1 parent 91b4835 commit 77f56ed
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 97 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ DEP_DIR = dep
DEPFLAGS = -MT $@ -MMD -MP -MF $(DEP_DIR)/$*.d

CXX = $(CROSS_BUILD)g++
BTYPE = -O3
BTYPE = -O0 -g
CXXFLAGS = -std=c++0x $(DEPFLAGS) $(BTYPE) -fopenmp \
-Wall -Wextra -Wno-sign-compare -Wno-unused-variable -Wno-unused-parameter -I"."
-Wall -Wextra -Werror -Wno-unused-variable -Wno-unused-parameter -I"."

LIBS += -fopenmp -lboost_system -lboost_filesystem -lboost_program_options
ifeq ($(OS),Windows_NT)
Expand Down
4 changes: 2 additions & 2 deletions bencode/decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ VectorType Bencode::readVector()
return ret;

switch (byte) {
case 'i': ret.push_back(readInt()); break;
case 'i': ret.push_back(readUint()); break;
case 'l': ret.push_back(readVector()); break;
case 'd': ret.push_back(readDictionary()); break;
case 'e': return ret;
Expand Down Expand Up @@ -143,7 +143,7 @@ Dictionary Bencode::readDictionary()
return Dictionary();

switch (byte) {
case 'i': ret[key] = readInt(); break;
case 'i': ret[key] = readUint(); break;
case 'l': ret[key] = readVector(); break;
case 'd': ret[key] = readDictionary(); break;
default:
Expand Down
20 changes: 7 additions & 13 deletions ctorrent/peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void Peer::verify()
m_peerId = peerId;
m_conn->write(m_handshake, 68);
m_torrent->handleNewPeer(shared_from_this());
m_eventId = g_sched.addEvent(std::bind(&Peer::sendKeepAlive, shared_from_this()), KEEPALIVE_INTERVAL);
// m_eventId = g_sched.addEvent(std::bind(&Peer::sendKeepAlive, shared_from_this()), KEEPALIVE_INTERVAL);
m_conn->read(4, std::bind(&Peer::handle, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
});
}
Expand Down Expand Up @@ -226,7 +226,7 @@ void Peer::handleMessage(MessageType messageType, InputMessage in)
return handleError("received too big piece block of size " + bytesToHumanReadable(payloadSize, true));

auto it = std::find_if(m_queue.begin(), m_queue.end(),
[index](const Piece *piece) { return piece->index == index; });
[index](const Piece *piece) { return piece->index == index; });
if (it == m_queue.end())
return handleError("received piece " + std::to_string(index) + " which we did not ask for");

Expand All @@ -240,17 +240,11 @@ void Peer::handleMessage(MessageType messageType, InputMessage in)
m_queue.erase(it);
delete piece;
} else {
PieceBlock *block = &piece->blocks[blockIndex];
uint8_t *payload = in.getBuffer(payloadSize);
if (block->rpos != 0) {
memcpy(block->data, payload, std::max(payloadSize, block->size - block->rpos));
free(payload);
block->rpos += payloadSize;
} else {
block->size = block->rpos = payloadSize;
block->data = payload;
++piece->currentBlocks;
}
PieceBlock *block = &piece->blocks[blockIndex];
block->size = payloadSize;
block->data = payload;
++piece->currentBlocks;

if (piece->currentBlocks == piece->numBlocks) {
DataBuffer<uint8_t> pieceData;
Expand Down Expand Up @@ -311,7 +305,7 @@ void Peer::handlePieceBlockData(size_t index, size_t begin, const uint8_t *block
{
// Check if piece block cancel was issued
auto it = std::find_if(m_requestedBlocks.begin(), m_requestedBlocks.end(),
[=] (const PieceBlockInfo &i) { return i.index == index && i.begin == begin; } );
[=] (const PieceBlockInfo &i) { return i.index == index && i.begin == begin; } );
if (it == m_requestedBlocks.end())
return;

Expand Down
7 changes: 1 addition & 6 deletions ctorrent/peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ class Peer : public std::enable_shared_from_this<Peer>
void connect(const std::string &ip, const std::string &port);

protected:
// Process piece blocks requested, send keepalive, etc.
void process();
// Used only in server verification (e.g. peer connected to us)
void verify();

void handle(const uint8_t *data, size_t size);
void handleMessage(MessageType mType, InputMessage in);
void handleError(const std::string &errmsg);
Expand Down Expand Up @@ -100,10 +96,9 @@ class Peer : public std::enable_shared_from_this<Peer>
private:
struct PieceBlock {
size_t size;
size_t rpos;
uint8_t *data;

PieceBlock() { data = nullptr; size = rpos = 0; }
PieceBlock() { data = nullptr; size = 0; }
~PieceBlock() { delete []data; }
};

Expand Down
47 changes: 29 additions & 18 deletions ctorrent/torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include <iostream>
#include <thread>
#include <random>
#include <fstream>

extern std::ofstream logfile;

Torrent::Torrent()
: m_listener(nullptr),
Expand Down Expand Up @@ -159,12 +162,10 @@ bool Torrent::checkTrackers()
bool Torrent::nextConnection()
{
if (m_listener) {
m_listener->accept(
[this] (const ConnectionPtr &c) {
auto peer = std::make_shared<Peer>(c, this);
peer->verify();
}
);
m_listener->accept([this] (const ConnectionPtr &c) {
auto peer = std::make_shared<Peer>(c, this);
peer->verify();
});

return true;
}
Expand All @@ -175,7 +176,7 @@ bool Torrent::nextConnection()
bool Torrent::queryTrackers(const TrackerQuery &query, uint16_t port)
{
bool success = queryTracker(m_meta.tracker(), query, port);
if (m_meta.trackers().empty())
if (success)
return success;

for (const boost::any &s : m_meta.trackers()) {
Expand All @@ -186,10 +187,10 @@ bool Torrent::queryTrackers(const TrackerQuery &query, uint16_t port)
return true;
} else if (s.type() == typeid(std::string) &&
queryTracker(Bencode::cast<std::string>(&s), query, port))
return true;
return true;
}

return success;
return false;
}

bool Torrent::queryTracker(const std::string &furl, const TrackerQuery &q, uint16_t tport)
Expand Down Expand Up @@ -293,6 +294,7 @@ void Torrent::addPeer(const PeerPtr &peer)
if (it != m_blacklisted.end())
m_blacklisted.erase(it);

logfile << peer->getIP() << ": now connected" << std::endl;
m_peers.insert(std::make_pair(peer->ip(), peer));
}

Expand All @@ -301,6 +303,8 @@ void Torrent::removePeer(const PeerPtr &peer, const std::string &errmsg)
auto it = m_peers.find(peer->ip());
if (it != m_peers.end())
m_peers.erase(it);

logfile << peer->getIP() << ": closing: " << errmsg << std::endl;
}

void Torrent::disconnectPeers()
Expand All @@ -325,51 +329,58 @@ void Torrent::sendBitfield(const PeerPtr &peer)

void Torrent::requestPiece(const PeerPtr &peer)
{
size_t index = m_fileManager.getPieceforRequest([peer] (size_t i) { return peer->hasPiece(i); });
size_t index = m_fileManager.getPieceforRequest(std::bind(&Peer::hasPiece, peer, std::placeholders::_1));
if (index != std::numeric_limits<size_t>::max())
peer->sendPieceRequest(index);
}

bool Torrent::handlePieceCompleted(const PeerPtr &peer, uint32_t index, DataBuffer<uint8_t> &&data)
{
uint32_t downloaded = data.size();
if (m_fileManager.writePieceBlock(index, peer->ip(), std::move(data))) {
m_downloadedBytes += downloaded;
logfile << peer->getIP() << ": finished downloading piece: " << index << std::endl;
if (m_fileManager.writePieceBlock(index, peer->ip(), std::move(data)))
return true;
}

m_wastedBytes += downloaded;
m_wastedBytes += data.size();
++m_hashMisses;
return false;
}

bool Torrent::handleRequestBlock(const PeerPtr &peer, uint32_t index, uint32_t begin, uint32_t length)
{
logfile << peer->getIP() << ": Requested piece block: " << index << std::endl;
return m_fileManager.requestPieceBlock(index, peer->ip(), begin, length);
}

void Torrent::onPieceWriteComplete(uint32_t from, uint32_t index)
void Torrent::onPieceWriteComplete(uint32_t from, size_t index)
{
logfile << ip2str(from) << ": Finished writing piece: " << index << std::endl;
logfile << "Pieces so far: " << m_fileManager.completedPieces() << "/" << m_fileManager.totalPieces() << std::endl;

m_downloadedBytes += m_fileManager.pieceSize(index);
for (const auto &it : m_peers)
if (!it.second->hasPiece(index))
if (it.second->ip() != from && !it.second->hasPiece(index))
it.second->sendHave(index);
}

void Torrent::onPieceReadComplete(uint32_t from, uint32_t index, uint32_t begin, uint8_t *block, size_t size)
void Torrent::onPieceReadComplete(uint32_t from, size_t index, int64_t begin, uint8_t *block, size_t size)
{
auto it = m_peers.find(from);
if (it != m_peers.end()) {
it->second->sendPieceBlock(index, begin, block, size);
m_uploadedBytes += size;
}

delete []block;
}

void Torrent::handleTrackerError(Tracker *tracker, const std::string &error)
{
logfile << tracker->host() << ": (T): " << error << std::endl;
}

void Torrent::handlePeerDebug(const PeerPtr &peer, const std::string &msg)
{
logfile << peer->getIP() << ": " << msg << std::endl;
}

void Torrent::handleNewPeer(const PeerPtr &peer)
Expand Down
8 changes: 4 additions & 4 deletions ctorrent/torrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class Torrent
clock_t elapsed();

// Get associated meta info for this torrent
const TorrentMeta *meta() const { return &m_meta; }
TorrentMeta *meta() { return &m_meta; }

// Get associated file manager for this torrent
const TorrentFileManager *fileManager() const { return &m_fileManager; }
TorrentFileManager *fileManager() { return &m_fileManager; }

protected:
bool queryTrackers(const TrackerQuery &r, uint16_t port);
Expand Down Expand Up @@ -108,8 +108,8 @@ class Torrent

public:
// TorrentFileManager -> Torrent
void onPieceWriteComplete(uint32_t from, uint32_t index);
void onPieceReadComplete(uint32_t from, uint32_t index, uint32_t begin, uint8_t *block, size_t size);
void onPieceWriteComplete(uint32_t from, size_t index);
void onPieceReadComplete(uint32_t from, size_t index, int64_t begin, uint8_t *block, size_t size);

private:
Server *m_listener;
Expand Down
Loading

0 comments on commit 77f56ed

Please sign in to comment.