Creating a BitTorrent client in Haskell #3

Jakub Okoński – November 24th, 2015

Haskell, BitTorrent

In the previous post, I laid down foundations for PeerMonad, where most of the protocol logic will live. This article will focus on using that towards our goal – downloading files.

Let’s quickly recap the first actions our client takes. First, it asks the tracker for peers, then it initiates connections to all of them and enters peer loops.

Conceptually, our peer loop boils down to this:

forever (getPeerEvent >>= handleEvent)

The peer loop is entered after handshakes. For peers that have completed at least one piece, the first message they send is a bitfield with their progress. We need this information to know which pieces can be downloaded from this peer, so let’s remember it.

You might recall from the first post that peers start out as uninterested and choked. To get to the point where we can send data messages and download files, the peer has to unchoke us first. For this purpose, the Interested message is sent out.

Let’s handle the first message:

handleEvent (PWPEvent pwp) = handlePWP pwp

handlePWP (Bitfield field) = do
  peerData <- getPeerData
  newBitField <- runMemory $ do
    len <- BF.length <$> getBitfield
    let newBitField = BF.BitField field len
    modifyAvailability $ PS.addToAvailability newBitField
    return newBitField
  updatePeerData $ peerData { peerBitField = newBitField }
  emit Interested
  • Make a BitField out of the raw bytestring with the same length as ours.
  • Update the availability cache with this new bitfield.
  • Store the bitfield in our local PeerData structure.
  • Emit the Interested message by default. This is short-sighted, but should do for now.

When the peer finally unchokes us, we can start downloading:

handlePWP Unchoke = do
  peerData <- getPeerData
  updatePeerData $ peerData { peerChoking = False }
  requestNextChunk

We’ll get into requestNextChunk soon enough, but before, let’s explain how downloads are split up in the protocol.

Every torrent is divided into pieces with size described in the MetaInfo dictionary. The last piece is allowed to be smaller. Each piece is further split up into chunks/blocks. Their size is implementation-defined and from what I could find, is usually either 214 or 215 bytes. There’s some discussion on the wiki describing the history behind this limit. 214 appears to be the safer bet for compatibility reasons. In this case, the last chunk is allowed to be smaller too.

Chunk fields

For holding the download status of chunks within a piece, I created this structure:

data ChunkField = ChunkField
  { missingChunks   :: IntSet
  , requestedChunks :: IntSet
  , completedChunks :: IntSet
  }

newChunkField :: Int -> ChunkField
newChunkField n = ChunkField (IntSet.fromList [0..(n-1)])
                              IntSet.empty
                              IntSet.empty

It starts out by storing all chunk indexes in the missing set. From there, we have access to simple operations like marking a chunk as requested or completed, asking if all chunks were completed or at least requested. There’s also a function that returns the next missing chunk (if there is one) and marks it as requested for us:

getNextChunk :: ChunkField -> Maybe (ChunkField, ChunkId)

This is the basis of our chunk selection algorithm, which returns missing chunks in ascending order of indexes.

Now that we can store progress within a piece, describing progress of all pieces that are being downloaded is simple:

type Chunks = Map PieceId (ChunkField, ByteString)

For every piece that is being downloaded, it stores a ChunkField and the buffer for final data. Keeping around that buffer can be quite expensive and other, more mature clients write every chunk to disk and verify them later when completed. For simplicity, we’ll keep it in memory until all chunks are completed.

Summing up, to know which pieces are completed - we use a BitField. To store the progress of a single piece - we use a ChunkField.

Requesting chunk by chunk

Let’s see the requestNextChunk function.

requestNextChunk :: F PeerMonad ()
requestNextChunk = do
  peerData <- getPeerData
  meta <- getMeta
  when (peerChoking peerData == False && requestsLive peerData < maxRequestsPerPeer) $
    operation <- runMemory $ nextRequestOperation peerData meta

    case operation of
      Just (RequestChunk pieceId chunkId pwpRequest) -> do
        let modifiedPeer = peerData { requestsLive = requestsLive peerData + 1 }
        updatePeerData modifiedPeer
        emit pwpRequest
        requestNextChunk
      Nothing -> return ()
  • Some conditions have to be checked at the beginning. The peer must not be choking us and we cannot exceed the maxRequestsPerPeer limit. This limit is important for balancing work and scaling the bandwidth used. I’ll describe it in future posts dedicated to the topic. For now it’s used to forbid peer loops from claiming too many pieces for themselves.

  • It relies on another function, nextRequestOperation, to inspect shared memory, make some changes and come up with an operation for us. When it tells us to request a specific chunk, we do just that.

  • The request is sent and the operation tried again.

Let’s dig into the nextRequestOperation.

data RequestOperation = RequestChunk PieceId ChunkId PWP

nextRequestOperation :: PeerData
                     -> MetaInfo
                     -> F MemoryMonad (Maybe RequestOperation)
nextRequestOperation peerData meta = do
  chunks <- getChunks
  avData <- getAvailability
  ourBitField <- getBitfield

  let peersBitField = peerBitField peerData
      infoDict = info meta
      defaultPieceLen = pieceLength infoDict
      totalSize = Meta.length infoDict

      requestedBitField = BF.fromChunkFields (BF.length peersBitField)
                                             (Map.toList (fst <$> chunks))
      completedBitField = BF.union ourBitField requestedBitField
      pendingBitField   = BF.difference peersBitField completedBitField

  case PS.getNextPiece pendingBitField avData of
    Nothing -> return Nothing
    Just piece ->
      let pieceLen = expectedPieceSize totalSize defaultPieceLen piece
      in case Map.lookup piece chunks of
        Nothing -> do
          let chunksCount = chunksInPiece pieceLen defaultChunkSize
              chunkData = B.replicate (fromIntegral pieceLen) 0
              chunkField = CF.newChunkField (fromIntegral chunksCount)
              chunkInfo = (chunkField, chunkData)
          claimChunk chunkInfo piece
        Just chunkInfo -> claimChunk chunkInfo piece
  • Shared variables needed to make a decision are read.

  • Taking in our bitfield of completed pieces, we add full requested ones. The result is subtracted from peer’s bitfield to get a full list of pieces that we can request from this peer. getNextPiece implements a rare-first piece selection algorithm.

  • The piece we were told to download is looked up in the map. A new ChunkField is inserted if this piece has not been started yet. Finally, a single chunk is claimed and returned as an operation.

In the code above, claimChunk is used, so let’s take a look at it. I promise this is the last piece (😃) of requesting logic for now.

claimChunk :: (CF.ChunkField, B.ByteString)
           -> PieceId
           -> F MemoryMonad (Maybe RequestOperation)
claimChunk (chunkField, chunkData) piece@(PieceId pieceId) =
  case CF.getNextChunk chunkField of
    Just (chunkField', chunk@(ChunkId chunkId)) -> do
      let request = Request pieceId
                            (chunkId * defaultChunkSize)
                            (getChunkSize piece chunk)
      modifyChunks $ Map.insert piece (chunkField', chunkData)
      return $ Just $ RequestChunk piece chunk request
    _ -> return Nothing
  where getChunkSize :: PieceId -> ChunkId -> Word32

When we have a chunk to request here, we do just that. A RequestChunk operation is returned to the caller.

Exception handling

The code so far has one fatal flaw: It relies on peers having flawless connection and uptime. In reality, there is a whole class of network errors, not to mention bad implementations of the client or the user closing the application that’s currently seeding us. All of these things can happen and we have to prepare for it.

It won’t be all that complicated, though. Forgetting about the resources allocated by the implementation of PeerMonad for now, the peer loop itself operates only on PeerData and shared memory. When an exception happens, there is nothing in PeerData that needs cleaning up – it only stores general information about the peer.

The story is different with shared state. We need to remember which chunks have been requested, so in case the connection is lost, we can mark them back as missing again. The piece availability cache should also be updated when the peer loop closes. No additional data is needed here, it can be done by simply taking the peer’s bitfield and removing it from the cache.

To remember which chunks have been requested, let’s add registerActiveChunk and deregisterActiveChunk as operations to our PeerMonad. They will take both piece and chunk ids and store or remove them from somewhere. To retrieve this data, we’ll define getActiveChunks too. Because events are used for anything that comes in from the outside, we have a nice property in return. Exceptions are not immediately thrown at us, they will come as events. This is useful for any code that behaves similarly to:

(piece, chunk) <- runMemory $ do
  -- claim a chunk
registerActiveChunk piece chunk

Usually, we can only be sure that STM will handle exceptions and always keep us in a consistent state. Thanks to errors delivered as events, we also know that no exception can happen before registerActiveChunk in this example. This guarantee will help in making the client more reliable.

Finally, our error handler might look like this:

-- | Marks the chunk as missing and deregisters the active chunk.
releaseActiveChunk :: PieceId -> ChunkId -> F PeerMonad ()
releaseActiveChunk pieceId chunkId = do
  runMemory $ modifyChunks $ \chunks ->
    case Map.lookup pieceId chunks of
      Just (cf, d) ->
        let cf' = CF.markMissing cf chunkId
        in Map.insert pieceId (cf', d) chunks
      Nothing -> chunks

  deregisterActiveChunk pieceId chunkId

onError = do
  activeChunks <- getActiveChunks
  let keys = Map.toList activeChunks
  traverse_ (uncurry releaseActiveChunk) keys

Receiving data

We’ve seen how to request data from the peer, it’s now time to process the response.

Let’s start with the function that receives raw data for a chunk.

receiveChunk :: PieceId -> Word32 -> ByteString -> F PeerMonad ()
receiveChunk piece offset d = do
  let chunkIndex = ChunkId (divideSize offset defaultChunkSize)

  wasMarked <- runMemory $ do
    chunks <- getChunks
    case Map.lookup piece chunks of
      Just (chunkField, chunkData) -> do
        do
          -- copy the incoming data into appropriate place in chunkData
          let (ptr, o, len) = BI.toForeignPtr chunkData
              chunkVector = VS.unsafeFromForeignPtr ptr o len
              (ptr', o', len') = BI.toForeignPtr d
              dataVector = VS.unsafeFromForeignPtr ptr' o' len'
              dest = VS.take (B.length d) $ VS.drop (fromIntegral offset) chunkVector
              src = dataVector
          unsafePerformIO $ VS.copy dest src >> return (return ())
        let chunkField' = CF.markCompleted chunkField chunkIndex

        modifyChunks $ Map.insert piece (chunkField', chunkData)
        return True
      _ -> return False -- someone already filled this

  deregisterActiveChunk piece chunkIndex
  pData <- getPeerData
  updatePeerData (pData { requestsLive = requestsLive pData - 1 })
  when wasMarked $ processPiece piece
  • Modify the buffer by copying the new data at the appropriate place. This is done by converting bytestrings to storable vectors. I will probably change it in the future, Seq ByteString seems like a good choice, concatted using a builder.
  • Mark the chunk as completed
  • wasMarked is returned out of the transaction telling if the chunk was completed successfully.
  • The active chunk is forgotten, so it won’t be released in case of an exception.

When the chunk was successfully marked as completed, processPiece is called. Its job is to check if the piece is now completed and proceed accordingly if so:

processPiece :: PieceId -> F PeerMonad ()
processPiece piece@(PieceId pieceId) = do
  meta <- getMeta
  let infoDict = info meta
      defaultPieceLen = pieceLength infoDict

  dataToWrite <- runMemory $ do
    Just (chunkField, d) <- Map.lookup piece <$> getChunks
    let getPieceHash (PieceId n) =
          B.take 20 $ B.drop (fromIntegral n * 20) $ pieces infoDict

    case CF.isCompleted chunkField of
      True -> do
        modifyChunks $ Map.delete piece
        if hash d == getPieceHash piece
          then do
            bitfield <- getBitfield
            let newBitfield = BF.set bitfield pieceId True
            modifyBitfield (const newBitfield)
            modifyAvailability (PS.addToAvailability newBitfield .
                                PS.removeFromAvailability bitfield)
            return (Just d)
          else return Nothing
      False -> return Nothing

  case dataToWrite of
    Just d -> writeData (defaultPieceLen * pieceId) d
    Nothing -> return ()
  • When all the chunks are completed, we remove download progress for this piece.
  • A hash check is performed to verify the completed piece. I’m using the Crypto.Hash library here.
  • Bitfield and availability cache are updated.
  • Data is written to disk.

When the piece is not completed, we short-circuit with no data to write and leave this function. If the hash check fails, our bitfield is not updated, but the chunkfield removed. This means that the piece is still marked as incomplete and the download progress removed. It will be retried just like any other missing piece.

A simple heuristic

Exception handling is needed and will help in many cases, but there is one more thing we should add. Imagine a scenario where, after requesting a chunk from the peer, he does not reply. The connection is alive, no exceptions were raised, but we still can’t get a reply from the other end.

For this purpose, I extended the registerActiveChunk functionality to record a timestamp whenever an active chunk is registered. This allows us to periodically check if a request to a peer timed out. Since we already have code that releases a chunk and marks it back as missing, it’s easy to implement.

Closing thoughts

Using PeerMonad from the previous article, we’ve implemented the download flow for our client. It will request pieces starting from rarest chunk by chunk. We’ve already made big steps in reliability, peers disconnecting or refusing to serve data won’t stop us from completing the torrent.

A single-file torrent, that is – a limitation I hope to address in the next article.

We love to solve tough problems. Got one?   Hire us