Project 4 - Distributed Hash Table
Introduction
The primary purpose of this assignment is to gain experience with the concepts and implementation of remote procedure calls (RPC) and a distributed hash table (DHT). The secondary purpose is to gain experience with mixed-technology parallel/distributed computing using both Pthreads and MPI.
Description
The starter files may be copied from /shared/cs470/p4-dht
on
the cluster, and consist of the following files:
-
local.h / local.c
Header and implementation for a very simple local lookup table. Note that the key-value pairs are stored sorted by key in a simple array--this is not a hash table, but it makes it easier to verify correctness by dumping all local tables at the end of a run. You should NOT edit this file. -
main.c
Driver program. This includes client code that reads through an input file (given as a command-line argument) and executes the commands in that file as specified below. You should NOT edit this file. -
dht.h
Header definitions for a distributed hash table. See the comments in this file for a description of the DHT functions. You should NOT edit this file. -
dht.c
Implementation for a distributed hash table. Currently, these functions merely delegate to their local versions (in other word, they're currently just pass-through wrappers). This means that only the rank 0 process will do anything and it will process all commands locally. Your task is to turn this implementation into a distributed implementation as described indht.h
and below, using Pthreads and MPI.
The provided driver is controlled by input files written in a very simple command language that supports the following commands:
-
proc num
Signifies the beginning of commands to be run on the given process ID. -
put key value
Instructs the DHT to create or update the given key-value pair. To keep things simple, all keys will be a single ASCII word with only alphanumeric characters, and all values will be long signed integers. For strong consistency, the "put" operation should not terminate until it has received a confirmation or acknowledgment that the value has been stored. -
get key
Instructs the DHT to retrieve the value associated with the given key. If the key is not present in the DHT, the operation should return the valueKEY_NOT_FOUND
(defined inlocal.h
). The driver handles the screen output of this information. -
size
Instructs the DHT to calculate and return the total number of key-value pairs. The driver handles the screen output of this information. -
sync
Instructs DHT processes to synchronize at this point. Because of the non-determinism inherent in parallel and distributed systems, this command is useful for testing the other commands. No process may continue executing commands until all processes reach their synchronization point. Each process may only callsync
once during execution, and ifsync
is not included explicitly in the commands for a particular process, the driver will insert an implicitsync
at the end of execution.
The command language also supports comments; any line that begins with a "#" will be completely ignored. Blank lines are ignored as well.
Here is an example input file:
proc 0 put xyz 5 put bar 99 put cat 7 sync get xyz get cat get abc size proc 1 put baz 1 put aaa 2 sync
This input file contains instructions for two processes (rank 0 and rank 1).
Rank 0 will request the storage of three key-value pairs ("xyz:5", "bar:99",
"cat:7") and rank 1 will request the storage of two key-value pairs ("baz:1",
"aaa:2"). At that point the client processes will synchronize, ensuring that
all ranks have reached the designated points in their command list. Note that
rank 2 and rank 3 will also sync, because there is an implicit sync
command at the end of each process's command list if none is specified in the
list.
After the synchronization point, rank 0 performs three retrieval operations ("xyz", "cat", and "abc"), the first two of which will succeed (locally and via rank 3, respectively) and the third which will result in a failure to find the key (on rank 1). Finally, rank 0 issues a request to calculate the total size of the DHT (which will be 5 in the distributed version).
Here is the correct output for this example (minus the salloc
output):
$ salloc -n 4 mpirun ./dht input1.txt Get("xyz") = 5 Get("cat") = 7 Key not found: "abc" Size = 5
The output should not have any extraneous text (debugging output, etc.).
Because all processes run commands independently, the ordering of output lines
does not matter, and there could be multiple "correct" answers for a given input
file depending on how messages get re-ordered during execution. If you want to
test deterministically, only run get
and size
commands
from a single process and use sync
first.
In the provided serial version, there is no Pthread or MPI code and it runs only the commands listed for rank 0. All key-value pairs are stored locally.
In the distributed version, every key should be assigned an "owner" via a very simple hash function that takes the ASCII value of the first character and performs a division modulo the total number of processes. For instance, if there are four processes, key "bar" will map to rank 2 because the ASCII code for 'b' is 98 and 98 mod 4 = 2. All key characters should be alphanumeric.
If a process needs to "put" or "get" a value into the hash table of another process, it should do so using a remote procedure call via MPI. You will also need to implement all-process communication for the "size" and "sync" commands. You may design the RPC protocol however you like, but I suggest keeping things as simple as possible.
You are also responsible for spawning and managing the server thread. You
should do this in dht_init()
and dht_destroy()
,
respectively. The server thread should execute a loop that waits for remote
procedure call requests from other processes and delegates them to the
appropriate local methods. The following diagram illustrates this communication
if the above file is run with four processes:
At the end of execution, every MPI process should dump its hash table to a
file named "dump-XX.txt
" where XX is the rank of the process. You
can easily view all these files after execution using "cat dump-*.txt
".
A correct dump for the input file listed above (run with four processes) is the
following:
$ cat dump-*.txt Process 0 Key="xyz" Value=5 Process 1 Key="aaa" Value=2 Process 2 Key="bar" Value=99 Key="baz" Value=1 Process 3 Key="cat" Value=7
Local table entries are stored in alphabetical order by key, so these dump file outputs should be deterministic.
Your solution must work reliably for up to 64 processes across eight nodes. Each MPI process should consist of two threads: a client (the main thread) and a server. You should NOT modify any of the client code, which reads the input file and executes the appropriate DHT commands.
The goal of this is to provide a transparent and consistent view of a distributed lookup table, and therefore performance is NOT a significant concern. As long as your submission runs in a reasonable amount of time (e.g., a few seconds for several hundred commands), speed is not a significant concern and you do not need to evaluate the scaling of your solution.
WARNING: The version of OpenMPI (the default MPI package)
on our cluster does NOT have full support for multithreading. Thus, you must use
MPICH for this project. Instructions on installing Spack and MPICH are on the
cluster guide. Be sure you do not also have OpenMPI
loaded. If you had "module load mpi
" in your
.bash_profile
or .bashrc
script, you should delete
it and re-login.
Don't forget to initialize and finalize MPI! Because you are using threads
in addition to MPI, you will want to initialize MPI using
MPI_Init_thread()
instead of MPI_Init()
. Here is an
example:
int provided; MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); if (provided != MPI_THREAD_MULTIPLE) { printf("ERROR: Cannot initialize MPI in THREAD_MULTIPLE mode.\n"); exit(EXIT_FAILURE); }
This snippet requests the full multithreaded mode
(MPI_THREAD_MULTIPLE
). There are actually three levels of
multithreading support in MPI, and unfortunately our installation of OpenMPI only
supports the third level (MPI_THREAD_SERIALIZED
). You can verify
this yourself by checking the value of provided
after the init call
(as in the above snippet). Thankfully, MPICH supports the
MPI_THREAD_MULTIPLE
level, so you can use it for this project.
Hints
HINT: Work incrementally! Don't try to implement the entire
protocol at once. Start by augmenting the init
and
destroy
routines to spawn and clean up the server thread. Then
implement the remote put
functionality (asynchronously at first),
using the end-of-execution dump files to check for correctness. You will also
need to make sure that the server threads do not exit until all clients have
finished processing commands. Once that is working, implement sync
and size
(which will require a request/response RPC). This will
help you change the put
functionality to be synchronous (as
required by the above spec). Finally, you can use all of that knowledge to
implement the get
function.
HINT: For the purposes of sending RPC messages, you don't need
to do full MPI derived datatypes. Just declare a C struct with all the fields
you want and send it via MPI using the MPI_BYTE
data type.
HINT: Consider writing some wrappers to handle parameter marshalling to/from MPI messages. This will clean up your client and server code considerably.
HINT: This project is complicated by the fact that both the server and client threads will need to send and receive MPI messages. Consider using MPI tags to help differentiate between the intended destination threads.
HINT: Don't forget that you will need some method of thread synchronization if you make modifications to the local hash table from both the server and the client threads.
HINT: You will need to make sure that the server threads do not
terminate until all nodes are done sending messages. The most common way to do
this cleanly is to have the server thread go into an "infinite" loop, stopping
when it receives a message (or messages) indicating that it is safe to do so.
You should NOT use any of the Pthread methods for forcefully killing the thread,
such as pthread_cancel()
or pthread_kill()
.
Grading Criteria
Your submission will be graded on a combination of correctness, functionality, elegance, style, and documentation. Here is a tentative grading rubric:
Grade | Description | Requirements |
---|---|---|
A | Exceptional |
|
B | Good |
|
C | Satisfactory |
|
D | Deficient |
|
F | Unacceptable |
Submission
You should submit your modified program as dht.c
on Canvas by
the due date. You should not submit your Makefile or anything else, and your
program must compile and run on the cluster using the reference version of the
other files.
All submitted code should be elegant, clean, consistent, and well-documented. The code I have provided uses 4-space indentation and the 1TBS style. If you choose to reformat the file to match your own style preferences, make sure you apply the changes throughout.
Peer Reviews
One of the goals of this course is to encourage good software development practices, especially when building a parallel or distributed software system. For this project, you will be assigned two other random students in the course. You must review their code and offer constructive feedback according to the given rubric.
After the submission deadline, I will assign you two other submissions to review on Canvas. I expect you to review two distinct submissions that are not your own; I will do my best to ensure that the assignments work out but sometimes Canvas does not cooperate. It is your responsibility to inform me immediately if you have been assigned yourself, your partner, or two other same-team partners to review.
To complete the peer review, submit a Canvas comment on the submission with your assessment of the submission according to the provided code review guidelines. I expect at least two paragraphs per review with detailed observations and an assigned overall numeric score. You will be graded on the quality, thoroughness, and professional tone of your review. The peer reviews are due one week after the original project due date.