Project 3 - Distributed Hash Table
Introduction
The primary purpose of this assignment is to gain experience with the concepts and implementation of remote procedure calls (RPC) and a distributed hash table (DHT). The secondary purpose is to gain experience with mixed-technology parallel/distributed computing using both Pthreads and MPI.
Description
The project distribution is available on the cluster at /shared/cs470/p3-dht.tar.gz, and consists of the following files:
-
local.h / local.c
Header and implementation for a very simple local lookup table. Note that the key-value pairs are stored sorted by key in a simple array--this is not a hash table, but it makes it easier to verify correctness by dumping all local tables at the end of a run. You should NOT edit this file. -
main.c
Driver program. This includes client code that reads through an input file (given as a command-line argument) and executes the commands in that file as specified below. You should NOT edit this file. -
dht.h
Header definitions for a distributed hash table. See the comments in this file for a description of the DHT functions. You should NOT edit this file. -
dht.c
Implementation for a distributed hash table. Currently, these functions merely delegate to their local versions (in other word, they're currently just pass-through wrappers). This means that only the rank 0 process will do anything and it will process all commands locally. Your task is to turn this implementation into a distributed implementation as described indht.h
and below, using Pthreads and MPI.
The provided driver is controlled by input files written in a very simple command language that supports the following commands:
-
proc num
Signifies the beginning of commands to be run on the given process rank. -
put key value
Instructs the DHT to create or update the given key-value pair. To keep things simple, all keys will be a single ASCII word with only alphanumeric characters, and all values will be long signed integers. For strong consistency, the "put" operation should not terminate until it has received a confirmation or acknowledgment that the value has been stored. -
get key
Instructs the DHT to retrieve the value associated with the given key. If the key is not present in the DHT, the operation should return the valueKEY_NOT_FOUND
(defined inlocal.h
). The driver handles the screen output of this information. -
size
Instructs the DHT to calculate and return the total number of key-value pairs. The driver handles the screen output of this information. -
sync
Instructs the DHT to synchronize at this point. Because of the non-determinism inherent in parallel and distributed systems, this command is useful for testing the other commands. No client may proceed past its synchronization point until all clients reach theirs. Each client may only callsync
once during execution, and ifsync
is not included explicitly in the commands for a particular process, the driver will insert an implicitsync
at the end of execution.
The command language also supports comments; any line that begins with a "#" will be completely ignored. Blank lines are ignored as well.
Example
Here is an example input file:
proc 0 put xyz 5 put bar 99 put z 7 sync get xyz get z get qw size proc 1 put baz 1 put stuff 2 sync
This input file contains instructions for two processes (rank 0 and rank 1).
Rank 0 will request the storage of three key-value pairs ("xyz:5", "bar:99",
"z:7") and rank 1 will request the storage of two key-value pairs ("baz:1",
"stuff:2"). At that point the client threads will synchronize, ensuring that
all ranks have reached the designated points in their command list. Note that
rank 2 and rank 3 will also sync, because there is an implicit sync
command at the end of each process's command list if none is specified in the
list.
After the synchronization point, rank 0 performs three retrieval operations ("xyz", "z", and "qw"), the first two of which will succeed (locally and via rank 3, respectively) and the third which will result in a failure to find the key (on rank 1). Finally, rank 0 issues a request to calculate the total size of the DHT (which should be 5 in the distributed version).
Here is the correct output for this example:
$ salloc -Q -n 4 mpirun ./dht input1.txt Get("xyz") = 5 Get("z") = 7 Key not found: "qw" Size = 5
Note the use of salloc and mpirun instead of srun -- see the warning below for important information about how to use MPICH for this assignment. The "-Q" option suppresses the job allocation messages that salloc prints by default.
The output should not have any extraneous text (debugging output, etc.).
Because all processes run commands independently, the ordering of output lines
does not matter, and there could be multiple "correct" answers for a given input
file depending on how messages get re-ordered during execution. If you want to
test deterministically, only run get
and size
commands
from a single process and use sync
first.
Coding
In the provided serial version, there is no Pthread or MPI code and it runs only the commands listed for rank 0. All key-value pairs are stored locally.
In the distributed version, every key should be assigned an "owner" via the
following hash function (where nprocs
is the total number of MPI
processes):
/* * given a key name, return the distributed hash table owner * (uses djb2 algorithm: http://www.cse.yorku.ca/~oz/hash.html) */ int hash(const char *name) { unsigned hash = 5381; while (*name != '\0') { hash = ((hash << 5) + hash) + (unsigned)(*name++); } return hash % nprocs; }
If a process needs to "put" or "get" a value into the hash table of another process, it should do so using a remote procedure call to the other process's server thread via MPI so that it can be delegated to the local version. You should not call any of the DHT versions in any of your code. You will need to implement collective communication for the "size" and "sync" commands (although you may not necessarily need MPI collectives). You may design the RPC protocol however you like, but I suggest keeping things as simple as possible.
You are also responsible for spawning and managing the server thread. You
should do this in dht_init()
and dht_destroy()
,
respectively. The server thread should execute a loop that waits for remote
procedure call requests from other processes and delegates them to the
appropriate local methods. The following diagram illustrates this communication
if the above file is run with four processes:
At the end of execution, every MPI process should dump its hash table to a
file named "dump-XXX.txt
" where XXX is the rank of the process. You
can easily view all these files after execution using "cat dump-*.txt
".
A correct dump for the input file listed above (run with four processes) is the
following:
$ cat dump-*.txt Process 0 Key="xyz" Value=5 Process 1 Key="stuff" Value=2 Process 2 Key="bar" Value=99 Key="baz" Value=1 Process 3 Key="z" Value=7
Local table entries are stored in alphabetical order by key, so these dump file outputs should be deterministic.
Your solution must work reliably for up to 64 processes across four nodes with up to 65K keys per process. Each MPI process should consist of two threads: a client (the main thread) and a server. You should NOT modify any of the client code, which reads the input file and executes the appropriate DHT commands.
The goal of this is to provide a transparent and consistent view of a distributed lookup table, and therefore performance is NOT a significant concern. As long as your submission runs in a reasonable amount of time (e.g., a few seconds for several hundred commands), speed is not a significant concern and you do not need to evaluate the scaling of your solution.
WARNING: The version of OpenMPI (the default MPI package) on our cluster does NOT have full support for multithreading. Thus, you must use MPICH for this project. Run the following command to enable MPICH:
module load mpi/mpich-4.2.0-x86_64
Be sure you do not also have OpenMPI loaded. If you had "module load mpi" in your .bash_profile or .bashrc script, you should delete it and re-login.
Don't forget to initialize and finalize MPI! Because you are using threads
in addition to MPI, you will want to initialize MPI using
MPI_Init_thread()
instead of MPI_Init()
. Here is an
example:
int provided; MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); if (provided != MPI_THREAD_MULTIPLE) { printf("ERROR: Cannot initialize MPI in THREAD_MULTIPLE mode.\n"); exit(EXIT_FAILURE); }
This snippet requests the full multithreaded mode
(MPI_THREAD_MULTIPLE
).
Analysis
After you complete your solution, you must answer the following analysis questions in 1-2 paragraphs each in a comment at the top of your source file:
- Did you use an AI-assist tool while constructing your solution? In what ways was it helpful (or not)?
- Describe your RPC protocol. How are the various different kinds of messages distinguished by the recipient? How did you ensure that each thread receives the appropriate messages? Is your protocol primarily synchronous or asynchronous?
- How did you verify that your solution is non-deterministic?
- Suppose that we wanted to add a new command called "sort" that performs a distributed merge sort to rank 0 using a fan pattern as shown in Figure 3.6 of our textbook. What difficulties do you anticipate and how would you address them? Include as much detail as possible.
- For this project, performance was not a significant concern because our cluster is not large enough to run with large process or key counts. However, suppose that we wanted to scale to thousands of processes and millions of keys. Which of the commands ("put", "get", "sync", and "size") would remain constant in terms of the time required to complete a single command? Of the rest, would their performance degrade proportional to the number of processes, the number of keys, or both? What about the "sort" operation from the previous question?
Hints
HINT: Work incrementally! Don't try to implement the entire
protocol at once. Start by augmenting the init
and
destroy
routines to spawn and clean up the server thread (and make
sure init
returns the process's MPI rank). Then
implement the remote put
functionality (asynchronously at first),
using the end-of-execution dump files to check for correctness. You will also
need to make sure that the server threads do not exit until all clients have
finished processing commands. Once that is working, implement sync
and size
(which will require a request/response RPC). This will
help you change the put
functionality to be synchronous (as
required by the above spec). Finally, you can use all of that knowledge to
implement the get
function.
HINT: For the purposes of sending RPC messages, you don't need to
do full MPI derived datatypes because we don't need to worry about heterogeneous
nodes. Just declare a C struct with all the fields you want and send it via MPI
using the MPI_BYTE
data type.
HINT: Consider writing some wrappers to handle parameter marshalling to/from MPI messages. This will clean up your client and server code considerably.
HINT: This project is complicated by the fact that both the server and client threads will need to send and receive MPI messages. Consider using MPI tags to help differentiate between the intended destination threads.
HINT: Don't forget that you will need some method of thread synchronization if you make modifications to the local hash table from both the server and the client threads.
HINT: You will need to make sure that the server threads do not
terminate until all nodes are done sending messages. The most common way to do
this cleanly is to have the server thread go into an "infinite" loop, stopping
when it receives a message (or messages) indicating that it is safe to do so.
You should NOT use any of the Pthread methods for forcefully killing the thread,
such as pthread_cancel()
or pthread_kill()
.
HINT: As implied by the MAX_KEYLEN macro in local.h, you may assume that all keys will be less than 64 characters long.
HINT: Only the client thread (e.g., the code in main()) should call the dht_* API functions; all of your code should be implemented internally or delegate to the local_* versions.
Grading Criteria
Your submission will be graded on a combination of correctness, functionality, elegance, style, and documentation. Here is a tentative grading rubric:
Grade | Description | Requirements |
---|---|---|
A | Exceptional |
|
B | Good |
|
C | Satisfactory |
|
D | Deficient |
|
F | Unacceptable |
Submission
You should submit your modified program as dht.c
on Canvas by
the due date. You should not submit your Makefile or anything else, and your
program must compile and run on the cluster using the reference version of the
other files.
All submitted code should be elegant, clean, consistent, and well-documented. The code I have provided uses 4-space indentation and the 1TBS style. If you choose to reformat the file to match your own style preferences, make sure you apply the changes throughout.
Code Reviews
One of the goals of this course is to encourage good software development practices, especially when building a parallel or distributed software system. For this project, you will be assigned two other random students in the course. You must review their code and offer constructive feedback according to the given rubric.
After the submission deadline, I will assign you two other submissions to review on Canvas. I expect you to review two distinct submissions that are not your own; I will do my best to ensure that the assignments work out but sometimes Canvas does not cooperate. It is your responsibility to inform me immediately if you have been assigned yourself, your partner, or two other same-team partners to review.
To complete the code review, submit a Canvas comment on the submission with your assessment of the submission according to the provided code review guidelines. I expect at least two paragraphs per review with detailed observations and an assigned overall numeric score. You will be graded on the quality, thoroughness, and professional tone of your review. The code reviews are due one week after the original project due date.