diff --git a/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java b/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java index 7ebc802a..1188f9f5 100644 --- a/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java +++ b/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java @@ -4,7 +4,7 @@ public class PersistentMemoryPool { static { - System.load("/usr/local/lib/libjnipmdk.so"); + System.load("/usr/local/lib/libjnipmdk.so"); } private static native long nativeOpenDevice(String path, long size); private static native void nativeSetBlock(long deviceHandler, String key, ByteBuffer byteBuffer, int size, boolean clean); @@ -13,6 +13,7 @@ public class PersistentMemoryPool { private static native void nativeDeleteBlock(long deviceHandler, String key); private static native long nativeGetRoot(long deviceHandler); private static native int nativeCloseDevice(long deviceHandler); + private static native long nativeRemoveBlock(long deviceHandler, String key); private static final long DEFAULT_PMPOOL_SIZE = 0L; @@ -41,6 +42,10 @@ public void deletePartition(String key) { nativeDeleteBlock(this.deviceHandler, key); } + public long removeBlock(String key) { + return nativeRemoveBlock(this.deviceHandler, key); + } + public long getRootAddr() { return nativeGetRoot(this.deviceHandler); } diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleBlockResolver.scala index 98845ff9..481de084 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleBlockResolver.scala @@ -24,4 +24,14 @@ private[spark] class PmemShuffleBlockResolver( PersistentMemoryHandler.stop() super.stop() } + + override def removeDataByMap(shuffleId: Int, mapId: Int): Unit ={ + val partitionNumber = conf.get("spark.sql.shuffle.partitions") + val persistentMemoryHandler = PersistentMemoryHandler.getPersistentMemoryHandler + + for (i <- 0 to partitionNumber.toInt){ + val key = "shuffle_" + shuffleId + "_" + mapId + "_" + i + persistentMemoryHandler.removeBlock(key) + } + } } diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala index 9f8100b7..fad40873 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala @@ -75,6 +75,10 @@ private[spark] class PersistentMemoryHandler( pmpool.deletePartition(blockId) } + def removeBlock(blockId: String): Long = { + pmpool.removeBlock(blockId) + } + def getPartitionManagedBuffer(blockId: String): ManagedBuffer = { new PmemManagedBuffer(this, blockId) } diff --git a/native/src/010-TestCasePersistentMemoryPool.cpp b/native/src/010-TestCasePersistentMemoryPool.cpp index 67c96f2f..65a35265 100644 --- a/native/src/010-TestCasePersistentMemoryPool.cpp +++ b/native/src/010-TestCasePersistentMemoryPool.cpp @@ -21,6 +21,12 @@ int test_multithread_put(uint64_t index, pmemkv* kv) { return 0; } +int test_multithread_remove(uint64_t index, pmemkv* kv) { + std::string key = std::to_string(index); + kv->remove(key); + return 0; +} + TEST_CASE( "PmemBuffer operations", "[PmemBuffer]" ) { char data[LENGTH] = {}; memset(data, 'a', LENGTH); @@ -52,7 +58,8 @@ TEST_CASE( "PmemBuffer operations", "[PmemBuffer]" ) { for (char c = 'a'; c < 'f'; c++) { memset(data + (c - 'a') * 3, c, 3); } - /*data should be "aaabbbcccdddeee"*/ + + //data should be "aaabbbcccdddeee" buf.write(data, 15); char* firstTime = buf.getDataForFlush(buf.getRemaining()); @@ -66,7 +73,9 @@ TEST_CASE( "PmemBuffer operations", "[PmemBuffer]" ) { } } + TEST_CASE("pmemkv operations", "[pmemkv]") { + SECTION("test open and close") { std::string key = "1"; pmemkv* kv = new pmemkv("/dev/dax0.0"); @@ -100,7 +109,7 @@ TEST_CASE("pmemkv operations", "[pmemkv]") { uint64_t value_size = 0; for (int i = 0; i < size; i++) { value_size += mm->meta[i*2+1]; - } + } REQUIRE(value_size == 11); std::free(mm->meta); std::free(mm); @@ -108,6 +117,167 @@ TEST_CASE("pmemkv operations", "[pmemkv]") { delete kv; } + SECTION("test remove element from an empty list"){ + std::string key = "remove-element-from-empty-list"; + pmemkv* kv = new pmemkv("/dev/dax0.0"); + int result = kv->remove(key); + REQUIRE(result == -1); + kv->free_all(); + delete kv; + } + + + SECTION("test remove an non-existed element"){ + std::string key = "key"; + pmemkv* kv = new pmemkv("/dev/dax0.0"); + int length = 5; + kv->put(key, "first", length); + string non_existed_key = "non-exist-key"; + int result = kv->remove(non_existed_key); + REQUIRE(result == -1); + kv->free_all(); + delete kv; + } + + SECTION("test remove an element from a single node list"){ + std::string key = "key-single"; + pmemkv* kv = new pmemkv("/dev/dax0.0"); + int length = 5; + kv->put(key, "first", length); + std::cout<<"Before remove a single node, dump all: "<dump_all(); + int result = kv->remove(key); + std::cout<<"After remove a single node, dump all: "<dump_all(); + REQUIRE(result == 0); + kv->free_all(); + delete kv; + } + + SECTION("test remove an element from a middle of a list"){ + pmemkv* kv = new pmemkv("/dev/dax0.0"); + std::string key1 = "first-key"; + int length1 = 5 ; + kv->put(key1, "first", length1); + + std::string key2 = "second-key"; + int length2 = 6; + kv->put(key2, "second", length2); + + std::string key3 = "third-key"; + int length3 = 5; + kv->put(key3, "third", length3); + + std::string key4 = "forth-key"; + int length4 = 5; + kv->put(key4, "forth", length4); + + kv->reverse_dump_all(); + long r1 = kv->remove(key4); + std::cout<<"The forth is removed, dump:"<reverse_dump_all(); + + long r2 = kv->remove(key2); + std::cout<<"The second is removed, dump:"<reverse_dump_all(); + long r3 = kv->remove(key1); + std::cout<<"The first is removed, dump:"<reverse_dump_all(); + long r4 = kv->remove(key3); + std::cout<<"The third is removed, dump:"<reverse_dump_all(); + REQUIRE((r1 + r2 + r3 + r4) == 0); + + kv->free_all(); + delete kv; + } + +SECTION("test put and remove multiple elements with same key"){ + pmemkv* kv = new pmemkv("/dev/dax0.0"); + std::string key = "key-multiple-objects"; + int size = 100; + int length = 5; + for(int i = 0; i < size; i++){ + kv->put(key, "first", length); + } + + int bytes_written = kv->getBytesWritten(); + assert(bytes_written == size * length); + kv->dump_all(); + + kv->remove(key); + std::cout<<"The key with " << size << " objects is removed, dump:"<dump_all(); + + assert(kv->getBytesWritten() == 0); + + kv->free_all(); + delete kv; +} + + SECTION("test multithreaded remove") { + std::vector threads; + pmemkv* kv = new pmemkv("/dev/dax0.0"); + int size = 10; + for (uint64_t i = 0; i < size; i++) { + threads.emplace_back(test_multithread_put, i, kv); + } + for (uint64_t i = 0; i < size; i++) { + threads[i].join(); + } + + std::cout<<"mark1. kv->getBytesWritten()="<getBytesWritten()<getBytesWritten() != 0); + kv->dump_all(); + + std::vector removeThreads; + for (uint64_t i = 0; i < size; i++) { + removeThreads.emplace_back(test_multithread_remove, i, kv); + } + for (uint64_t i = 0; i < size; i++) { + removeThreads[i].join(); + } + std::cout<<"mark2. kv->getBytesWritten()="<getBytesWritten()<getBytesWritten() == 0); + kv->dump_all(); + + kv->free_all(); + delete kv; + threads.clear(); + } + +SECTION("test remove an element in specific sequence"){ + pmemkv* kv = new pmemkv("/dev/dax0.0"); + std::string key1 = "first-key"; + int length1 = 5; + kv->put(key1, "first", length1); + + std::string key2 = "second-key"; + int length2 = 6; + kv->put(key2, "second", length2); + + std::string key3 = "third-key"; + int length3 = 5; + kv->put(key3, "third", length3); + + long r1 = kv->remove(key2); + std::cout<<"The second is removed, dump:"<dump_all(); + + long r2 = kv->remove(key3); + std::cout<<"The third is removed, dump:"<dump_all(); + + long r3 = kv->remove(key1); + std::cout<<"The first is removed, dump:"<dump_all(); + + REQUIRE((r1 + r2 + r3) == 0); + + kv->free_all(); + delete kv; +} + SECTION("test multithreaded put and get") { std::vector threads; pmemkv* kv = new pmemkv("/dev/dax0.0"); diff --git a/native/src/lib_jni_pmdk.cpp b/native/src/lib_jni_pmdk.cpp index 79c49205..2ce9132b 100644 --- a/native/src/lib_jni_pmdk.cpp +++ b/native/src/lib_jni_pmdk.cpp @@ -47,7 +47,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ uint64_t value_size; pmkv->get_value_size(key_str, &value_size); return value_size; - } +} + +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeRemoveBlock + (JNIEnv *env, jclass obj, jlong kv, jstring key){ + const char *CStr = env->GetStringUTFChars(key, 0); + string key_str(CStr); + pmemkv *pmkv = static_cast((void*)kv); + long result = pmkv->remove(key_str); + return result; +} JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice (JNIEnv *env, jclass obj, jlong kv) { diff --git a/native/src/lib_jni_pmdk.h b/native/src/lib_jni_pmdk.h index fdbeab9c..34175b04 100644 --- a/native/src/lib_jni_pmdk.h +++ b/native/src/lib_jni_pmdk.h @@ -47,6 +47,14 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemory JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetBlockSize (JNIEnv *, jclass, jlong, jstring); +/* + * Class: lib_jni_pmdk + * Method: nativeRemoveBlock + * Signature: (JLjava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeRemoveBlock + (JNIEnv *, jclass, jlong, jstring); + /* * Class: lib_jni_pmdk * Method: nativeGetRoot diff --git a/native/src/libjnipmdk.so b/native/src/libjnipmdk.so new file mode 100755 index 00000000..6e13f4f2 Binary files /dev/null and b/native/src/libjnipmdk.so differ diff --git a/native/src/pmemkv.h b/native/src/pmemkv.h index 3820c95d..e2b2b138 100644 --- a/native/src/pmemkv.h +++ b/native/src/pmemkv.h @@ -21,23 +21,24 @@ // block header stored in pmem struct block_hdr { - PMEMoid next; + PMEMoid next; + PMEMoid pre; uint64_t key; - uint64_t size; + uint64_t size; }; // block data entry stored in pmem struct block_entry { - struct block_hdr hdr; + struct block_hdr hdr; PMEMoid data; }; // pmem root entry struct base { - PMEMoid head; - PMEMoid tail; - PMEMrwlock rwlock; - uint64_t bytes_written; + PMEMoid head; + PMEMoid tail; + PMEMrwlock rwlock; + uint64_t bytes_written; }; // block metadata stored in memory @@ -45,6 +46,8 @@ struct block_meta { block_meta* next; uint64_t off; uint64_t size; + block_entry* bep; + PMEMoid beo; }; struct block_meta_list { @@ -140,6 +143,7 @@ class pmemkv { } char* pmem_data = (char*)pmemobj_direct(bep->data); memcpy(pmem_data, buf, count); + bep->hdr.pre = bp->tail; bep->hdr.next = OID_NULL; bep->hdr.key = key_i; bep->hdr.size = count; @@ -161,7 +165,7 @@ class pmemkv { (void) pmemobj_tx_end(); // update in-memory index - if (update_meta(bep)) { + if (update_meta(bep, beo)) { return -1; } return 0; @@ -191,6 +195,128 @@ class pmemkv { return 0; } + int removeBlocks(uint64_t shuffleId, uint64_t mapId, uint64_t partitionId){ + for (int i = 0; i < partitionId; i++){ + std::string key = "shuffle_" + std::to_string(shuffleId) + "_" + std::to_string(mapId) + "_" + std::to_string(i); + remove(key); + } + return 0; + } + + int getBytesWritten(){ + return bp->bytes_written; + } + + int remove(std::string &key){ + std::lock_guard l(mtx); + xxh::hash64_t key_i = xxh::xxhash<64>(key); + if (!index_map.contains(key_i)){ + //std::cout<<"Data with key="<rwlock,TX_PARAM_NONE)) { + std::cout<<"pmemobj_tx_begin failed in pmemkv put"<head; + + //Delete block_entry in bml one by one + while (cur != nullptr) { + block_entry* bep = cur->bep; + //Node to be deleted is at the head + if(bep == (struct block_entry*)pmemobj_direct(bp->head)){ + if (pmemobj_direct(bep->hdr.next) == nullptr){ + //There is only one block_entry + bp->head = OID_NULL; + bp->tail = OID_NULL; + bp->bytes_written = bp->bytes_written - bep->hdr.size; + pmemobj_free(&bep->data); + pmemobj_free(&cur->beo); + struct block_meta *next = cur->next; + cur->next = nullptr; + std::free(cur); + cur = next; + bytes_allocated -= sizeof(block_meta); + //std::cout<<"Only key in head is removed. key="<head = bep->hdr.next; + block_entry* new_head_pointer = (struct block_entry*)pmemobj_direct(bp->head); + new_head_pointer->hdr.pre = OID_NULL; + bp->bytes_written = bp->bytes_written - bep->hdr.size; + pmemobj_free(&bep->data); + pmemobj_free(&cur->beo); + struct block_meta *next = cur->next; + cur->next = nullptr; + std::free(cur); + cur = next; + bytes_allocated -= sizeof(block_meta); + //std::cout<<"Key in head is removed. key="<hdr.next) == nullptr){ + //The one node scenario is already covered in head judgement, there are two or more nodes here + struct block_entry* prebep = (struct block_entry*)pmemobj_direct(bep->hdr.pre); + prebep->hdr.next = OID_NULL; + bp->tail = bep->hdr.pre; + if((struct block_entry*)pmemobj_direct(bp->tail) == nullptr){ + std::cout<<"Error. The bp->tail should not be nullptr"<bytes_written = bp->bytes_written - bep->hdr.size; + pmemobj_free(&bep->data); + pmemobj_free(&cur->beo); + struct block_meta *next = cur->next; + cur->next = nullptr; + std::free(cur); + cur = next; + bytes_allocated -= sizeof(block_meta); + //std::cout<<"Key in tail is removed. key="<hdr.pre); + prebep->hdr.next = bep->hdr.next; + struct block_entry* nextbep = (struct block_entry*)pmemobj_direct(bep->hdr.next); + nextbep->hdr.pre = bep->hdr.pre; + bp->bytes_written = bp->bytes_written - bep->hdr.size; + pmemobj_free(&bep->data); + pmemobj_free(&cur->beo); + struct block_meta *next = cur->next; + cur->next = nullptr; + std::free(cur); + cur = next; + bytes_allocated -= sizeof(block_meta); + //std::cout<<"Key neither in head nor in tail is removed. key="<(key); if (!index_map.contains(key_i)) { @@ -256,6 +382,25 @@ class pmemkv { return 0; } + int reverse_dump_all() { + if (pmemobj_rwlock_rdlock(pmem_pool, &bp->rwlock) != 0) { + return -1; + } + struct block_entry* next_bep = (struct block_entry*)pmemobj_direct(bp->tail); + uint64_t read_offset = 0; + while (next_bep != nullptr) { + char* pmem_data = (char*)pmemobj_direct(next_bep->data); + char* tmp = (char*)std::malloc(next_bep->hdr.size); + memcpy(tmp, pmem_data, next_bep->hdr.size); + std::cout << "key " << next_bep->hdr.key << " value " << pmem_data << std::endl; + read_offset += next_bep->hdr.size; + std::free(tmp); + next_bep = (struct block_entry*)pmemobj_direct(next_bep->hdr.pre); + } + pmemobj_rwlock_unlock(pmem_pool, &bp->rwlock); + return 0; + } + int dump_meta() { std::cout << "pmemkv total bytes written " << bp->bytes_written << std::endl; std::lock_guard l(mtx); @@ -270,6 +415,7 @@ class pmemkv { } int free_all() { + //std::cout<<"free's begining: bp->bytes_written: "<bytes_written<head; struct block_entry* next_bep = (struct block_entry*)pmemobj_direct(next_beo); @@ -286,6 +432,7 @@ class pmemkv { // add root block to undo log bp->head = OID_NULL; bp->tail = OID_NULL; + //std::cout<<"free: bp->bytes_written: "<bytes_written<bytes_written == 0); // free metadata @@ -331,11 +478,13 @@ class pmemkv { bo = pmemobj_root(pmem_pool, sizeof(struct base)); bp = (struct base*)pmemobj_direct(bo); struct block_entry *next = (struct block_entry*)pmemobj_direct(bp->head); + PMEMoid next_beo = bp->head; while (next != nullptr) { - if (update_meta(next)) { + if (update_meta(next, next_beo)) { return -1; } - next = (struct block_entry*)pmemobj_direct(next->hdr.next); + next_beo = next->hdr.next; + next = (struct block_entry*)pmemobj_direct(next_beo); } return 0; } @@ -368,7 +517,7 @@ class pmemkv { return 0; } - int update_meta(struct block_entry* bep) { + int update_meta(struct block_entry* bep, PMEMoid beo) { std::lock_guard l(mtx); if (!index_map.contains(bep->hdr.key)) { // allocate new block_meta_list struct block_meta* bm = (struct block_meta*)std::malloc(sizeof(block_meta)); @@ -380,6 +529,8 @@ class pmemkv { bm->off = (uint64_t)pmemobj_direct(bep->data); bm->size = bep->hdr.size; bm->next = nullptr; + bm->bep = bep; + bm->beo = beo; struct block_meta_list* bml = (struct block_meta_list*)std::malloc(sizeof(block_meta_list)); if (!bml) { perror("malloc error in pmemkv update_meta"); @@ -405,6 +556,8 @@ class pmemkv { bm->off = (uint64_t)pmemobj_direct(bep->data); bm->size = bep->hdr.size; bm->next = nullptr; + bm->bep = bep; + bm->beo = beo; bml->tail->next = bm; bml->tail = bm; bml->total_size += bm->size;