Project 4 - Distributed Hash Table

Introduction

The primary purpose of this assignment is to gain experience with the concepts and implementation of remote procedure calls (RPC) and a distributed hash table (DHT). The secondary purpose is to gain experience with mixed-technology parallel/distributed computing using both Pthreads and MPI.

Description

The project distribution is available on the cluster at /shared/cs470/p4-dht.tar.gz, and consists of the following files:

  • local.h / local.c

    Header and implementation for a very simple local lookup table. Note that the key-value pairs are stored sorted by key in a simple array--this is not a hash table, but it makes it easier to verify correctness by dumping all local tables at the end of a run. You should NOT edit this file.
  • main.c

    Driver program. This includes client code that reads through an input file (given as a command-line argument) and executes the commands in that file as specified below. You should NOT edit this file.
  • dht.h

    Header definitions for a distributed hash table. See the comments in this file for a description of the DHT functions. You should NOT edit this file.
  • dht.c

    Implementation for a distributed hash table. Currently, these functions merely delegate to their local versions (in other word, they're currently just pass-through wrappers). This means that only the rank 0 process will do anything and it will process all commands locally. Your task is to turn this implementation into a distributed implementation as described in dht.h and below, using Pthreads and MPI.

The provided driver is controlled by input files written in a very simple command language that supports the following commands:

  • proc num

    Signifies the beginning of commands to be run on the given process rank.
  • put key value

    Instructs the DHT to create or update the given key-value pair. To keep things simple, all keys will be a single ASCII word with only alphanumeric characters, and all values will be long signed integers. For strong consistency, the "put" operation should not terminate until it has received a confirmation or acknowledgment that the value has been stored.
  • get key

    Instructs the DHT to retrieve the value associated with the given key. If the key is not present in the DHT, the operation should return the value KEY_NOT_FOUND (defined in local.h). The driver handles the screen output of this information.
  • size

    Instructs the DHT to calculate and return the total number of key-value pairs. The driver handles the screen output of this information.
  • sync

    Instructs the DHT to synchronize at this point. Because of the non-determinism inherent in parallel and distributed systems, this command is useful for testing the other commands. No client may proceed past its synchronization point until all clients reach theirs. Each client may only call sync once during execution, and if sync is not included explicitly in the commands for a particular process, the driver will insert an implicit sync at the end of execution.

The command language also supports comments; any line that begins with a "#" will be completely ignored. Blank lines are ignored as well.

Here is an example input file:

proc 0

put xyz 5
put bar 99
put z 7
sync
get xyz
get blah
get qw
size

proc 1

put baz 1
put stuff 2
sync

This input file contains instructions for two processes (rank 0 and rank 1). Rank 0 will request the storage of three key-value pairs ("xyz:5", "bar:99", "z:7") and rank 1 will request the storage of two key-value pairs ("baz:1", "stuff:2"). At that point the client threads will synchronize, ensuring that all ranks have reached the designated points in their command list. Note that rank 2 and rank 3 will also sync, because there is an implicit sync command at the end of each process's command list if none is specified in the list.

After the synchronization point, rank 0 performs three retrieval operations ("xyz", "blah", and "qw"), the first two of which will succeed (locally and via rank 3, respectively) and the third which will result in a failure to find the key (on rank 1). Finally, rank 0 issues a request to calculate the total size of the DHT (which should be 5 in the distributed version).

Here is the correct output for this example (minus the salloc output):

$ salloc -n 4 mpirun ./dht input1.txt
Get("xyz") = 5
Key not found: "blah"
Key not found: "qw"
Size = 5

The output should not have any extraneous text (debugging output, etc.). Because all processes run commands independently, the ordering of output lines does not matter, and there could be multiple "correct" answers for a given input file depending on how messages get re-ordered during execution. If you want to test deterministically, only run get and size commands from a single process and use sync first.

In the provided serial version, there is no Pthread or MPI code and it runs only the commands listed for rank 0. All key-value pairs are stored locally.

In the distributed version, every key should be assigned an "owner" via the following hash function (where nprocs is the total number of MPI processes):

/*
 * given a key name, return the distributed hash table owner
 * (uses djb2 algorithm: http://www.cse.yorku.ca/~oz/hash.html)
 */
int hash(const char *name)
{
    unsigned hash = 5381;
    while (*name != '\0') {
        hash = ((hash << 5) + hash) + (unsigned)(*name++);
    }
    return hash % nprocs;
}

If a process needs to "put" or "get" a value into the hash table of another process, it should do so using a remote procedure call to the other process's server thread via MPI so that it can be delegated to the local version. You should not call any of the DHT versions in any of your code. You will need to implement collective communication for the "size" and "sync" commands (although you may not necessarily need MPI collectives). You may design the RPC protocol however you like, but I suggest keeping things as simple as possible.

You are also responsible for spawning and managing the server thread. You should do this in dht_init() and dht_destroy(), respectively. The server thread should execute a loop that waits for remote procedure call requests from other processes and delegates them to the appropriate local methods. The following diagram illustrates this communication if the above file is run with four processes:

Communication diagram

At the end of execution, every MPI process should dump its hash table to a file named "dump-XXX.txt" where XXX is the rank of the process. You can easily view all these files after execution using "cat dump-*.txt". A correct dump for the input file listed above (run with four processes) is the following:

$ cat dump-*.txt
Process 0
  Key="xyz" Value=5
Process 1
  Key="stuff" Value=2
Process 2
  Key="bar" Value=99
  Key="baz" Value=1
Process 3
  Key="z" Value=7

Local table entries are stored in alphabetical order by key, so these dump file outputs should be deterministic.

Your solution must work reliably for up to 64 processes across eight nodes with up to 65K keys per process. Each MPI process should consist of two threads: a client (the main thread) and a server. You should NOT modify any of the client code, which reads the input file and executes the appropriate DHT commands.

The goal of this is to provide a transparent and consistent view of a distributed lookup table, and therefore performance is NOT a significant concern. As long as your submission runs in a reasonable amount of time (e.g., a few seconds for several hundred commands), speed is not a significant concern and you do not need to evaluate the scaling of your solution.

WARNING: The version of OpenMPI (the default MPI package) on our cluster does NOT have full support for multithreading. Thus, you must use MPICH for this project. Run the following command to enable MPICH:

module load mpi/mpich-3.2.1

Be sure you do not also have OpenMPI loaded. If you had "module load mpi" in your .bash_profile or .bashrc script, you should delete it and re-login.

Don't forget to initialize and finalize MPI! Because you are using threads in addition to MPI, you will want to initialize MPI using MPI_Init_thread() instead of MPI_Init(). Here is an example:

    int provided;
    MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);
    if (provided != MPI_THREAD_MULTIPLE) {
        printf("ERROR: Cannot initialize MPI in THREAD_MULTIPLE mode.\n");
        exit(EXIT_FAILURE);
    }

This snippet requests the full multithreaded mode (MPI_THREAD_MULTIPLE). There are actually three levels of multithreading support in MPI, and unfortunately our installation of OpenMPI only supports the third level (MPI_THREAD_SERIALIZED). You can verify this yourself by checking the value of provided after the init call (as in the above snippet). Thankfully, MPICH supports the MPI_THREAD_MULTIPLE level, so you can use it for this project.

Hints

HINT: Work incrementally! Don't try to implement the entire protocol at once. Start by augmenting the init and destroy routines to spawn and clean up the server thread (and make sure init returns the process's MPI rank). Then implement the remote put functionality (asynchronously at first), using the end-of-execution dump files to check for correctness. You will also need to make sure that the server threads do not exit until all clients have finished processing commands. Once that is working, implement sync and size (which will require a request/response RPC). This will help you change the put functionality to be synchronous (as required by the above spec). Finally, you can use all of that knowledge to implement the get function.

HINT: For the purposes of sending RPC messages, you don't need to do full MPI derived datatypes because we don't need to worry about heterogeneous nodes. Just declare a C struct with all the fields you want and send it via MPI using the MPI_BYTE data type.

HINT: Consider writing some wrappers to handle parameter marshalling to/from MPI messages. This will clean up your client and server code considerably.

HINT: This project is complicated by the fact that both the server and client threads will need to send and receive MPI messages. Consider using MPI tags to help differentiate between the intended destination threads.

HINT: Don't forget that you will need some method of thread synchronization if you make modifications to the local hash table from both the server and the client threads.

HINT: You will need to make sure that the server threads do not terminate until all nodes are done sending messages. The most common way to do this cleanly is to have the server thread go into an "infinite" loop, stopping when it receives a message (or messages) indicating that it is safe to do so. You should NOT use any of the Pthread methods for forcefully killing the thread, such as pthread_cancel() or pthread_kill().

HINT: As implied by the MAX_KEYLEN macro in local.h, you may assume that all keys will be less than 64 characters long.

HINT: Only the client thread (e.g., the code in main()) should call the dht_* API functions; all of your code should be implemented internally or delegate to the local_* versions.

Grading Criteria

Your submission will be graded on a combination of correctness, functionality, elegance, style, and documentation. Here is a tentative grading rubric:

Grade Description Requirements
A Exceptional
  • Correct remote results for all commands.
  • Clean code with fully consistent style.
  • No calls to unsafe functions.
  • Insightful documentation.
  • All of the below.
B Good
  • Consistent and reasonable style.
  • Correct results for remote "sync" and "get" commands.
  • No memory leaks or other memory errors.
  • No compiler warnings.
  • Useful and consistent documentation.
  • All of the below.
C Satisfactory
  • Compiles without modifications.
  • Correct results for local calls.
  • Correct results for remote "put" commands.
  • Readable style.
  • Minimal amount of documentation.
  • All of the below.
D Deficient
  • Some evidence of a good-faith attempt.
F Unacceptable

Submission

You should submit your modified program as dht.c on Canvas by the due date. You should not submit your Makefile or anything else, and your program must compile and run on the cluster using the reference version of the other files.

All submitted code should be elegant, clean, consistent, and well-documented. The code I have provided uses 4-space indentation and the 1TBS style. If you choose to reformat the file to match your own style preferences, make sure you apply the changes throughout.

Peer Reviews

One of the goals of this course is to encourage good software development practices, especially when building a parallel or distributed software system. For this project, you will be assigned two other random students in the course. You must review their code and offer constructive feedback according to the given rubric.

After the submission deadline, I will assign you two other submissions to review on Canvas. I expect you to review two distinct submissions that are not your own; I will do my best to ensure that the assignments work out but sometimes Canvas does not cooperate. It is your responsibility to inform me immediately if you have been assigned yourself, your partner, or two other same-team partners to review.

To complete the peer review, submit a Canvas comment on the submission with your assessment of the submission according to the provided code review guidelines. I expect at least two paragraphs per review with detailed observations and an assigned overall numeric score. You will be graded on the quality, thoroughness, and professional tone of your review. The peer reviews are due one week after the original project due date.