Skip to content

Commit

Permalink
Merge pull request #49 from stavalfi/start_over2
Browse files Browse the repository at this point in the history
Created the final POC for v1 algorithm
  • Loading branch information
stavalfi committed Sep 8, 2018
2 parents f06be25 + 7167ca0 commit 9a639e7
Show file tree
Hide file tree
Showing 47 changed files with 964 additions and 568 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>UniversityFinalProjects</groupId>
<artifactId>torrentx</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0</version>
<packaging>jar</packaging>

<name>torrentx</name>
Expand Down
78 changes: 44 additions & 34 deletions src/main/java/main/App.java
Original file line number Diff line number Diff line change
@@ -1,68 +1,73 @@
package main;

import christophedetroyer.torrent.TorrentParser;
import main.downloader.PieceEvent;
import main.downloader.TorrentDownloader;
import main.downloader.TorrentDownloaderBuilder;
import main.downloader.TorrentDownloaders;
import main.file.system.FileSystemLink;
import main.peer.IncomingPeerMessagesNotifier;
import main.listener.ListenerAction;
import main.peer.Link;
import main.peer.SendMessagesNotifications;
import main.peer.peerMessages.RequestMessage;
import main.peer.peerMessages.HaveMessage;
import main.torrent.status.TorrentStatusAction;
import reactor.core.publisher.Flux;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class App {
public static Scheduler timeoutScheduler = Schedulers.newSingle("TIMEOUT");
public static Scheduler timeoutFallbackScheduler = Schedulers.newSingle("TIMEOUT-FALLBACK");
private static Logger logger = LoggerFactory.getLogger(App.class);

private static String downloadPath = System.getProperty("user.dir") + File.separator + "torrents-test" + File.separator;

private static void f5() throws IOException, InterruptedException {
TorrentDownloader torrentDownloader$ = TorrentDownloaderBuilder.buildDefault(getTorrentInfo(), "App", downloadPath);
TorrentDownloaders.getInstance().saveTorrentDownloader(torrentDownloader$);

torrentDownloader$.getIncomingPeerMessagesNotifier()
.getIncomingPeerMessages$()
.map(AbstractMap.SimpleEntry::getValue)
TorrentDownloader torrentDownloader = TorrentDownloaderBuilder.buildDefault(getTorrentInfo(), "App", downloadPath);

torrentDownloader.getFileSystemLink()
.savedPieces$()
.map(completedPieceIndex ->
IntStream.range(0, torrentDownloader.getTorrentInfo().getPieces().size())
.mapToObj(pieceIndex -> pieceIndex == completedPieceIndex ? "*" : torrentDownloader.getFileSystemLink().havePiece(pieceIndex) ? "1" : "0")
.collect(Collectors.joining()))
.subscribe(System.out::println);

torrentDownloader$.getPeersCommunicatorFlux()
// torrentDownloader.getIncomingPeerMessagesNotifier()
// .getPieceMessageResponseFlux()
// .subscribe(System.out::println);

torrentDownloader.getPeersCommunicatorFlux()
.map(Link::sendMessages)
.flatMap(SendMessagesNotifications::sentPeerMessages$)
.filter(peerMessage -> peerMessage instanceof RequestMessage)
.cast(RequestMessage.class)
.map(requestMessage -> "request: index: " + requestMessage.getIndex() +
", begin: " + requestMessage.getBegin() + ", from: " + requestMessage.getTo())
.subscribe(System.out::println, Throwable::printStackTrace);

torrentDownloader$.getFileSystemLink()
.savedBlocks$()
.map(PieceEvent::getReceivedPiece)
.map(pieceMessage -> "received: index: " + pieceMessage.getIndex() +
", begin: " + pieceMessage.getBegin() + ", from: " + pieceMessage.getFrom())
.subscribe(System.out::println, Throwable::printStackTrace);

torrentDownloader$.getTorrentStatusStore().dispatchNonBlocking(TorrentStatusAction.START_DOWNLOAD_IN_PROGRESS);
torrentDownloader$.getTorrentStatusStore().dispatchNonBlocking(TorrentStatusAction.START_UPLOAD_IN_PROGRESS);
torrentDownloader$.getTorrentStatusStore().dispatchNonBlocking(TorrentStatusAction.START_SEARCHING_PEERS_IN_PROGRESS);
.filter(peerMessage -> peerMessage instanceof HaveMessage)
.cast(HaveMessage.class)
.map(haveMessage -> "sent: " + haveMessage.toString())
.subscribe(System.out::println);

TorrentDownloaders.getListenStore().dispatchNonBlocking(ListenerAction.START_LISTENING_IN_PROGRESS);
torrentDownloader.getTorrentStatusStore().dispatchNonBlocking(TorrentStatusAction.START_DOWNLOAD_IN_PROGRESS);
torrentDownloader.getTorrentStatusStore().dispatchNonBlocking(TorrentStatusAction.START_UPLOAD_IN_PROGRESS);
torrentDownloader.getTorrentStatusStore().dispatchNonBlocking(TorrentStatusAction.START_SEARCHING_PEERS_IN_PROGRESS);
}


public static void main(String[] args) throws Exception {
deleteDownloadFolder();
Hooks.onOperatorDebug();
f5();
Thread.sleep(1000 * 1000);
Thread.sleep(10000 * 1000);
}

private static void deleteDownloadFolder() {
Expand Down Expand Up @@ -109,7 +114,12 @@ private static TorrentInfo getTorrentInfo() throws IOException {
"main" + File.separator +
"resources" + File.separator +
"torrents" + File.separator +
"tor.torrent";
return new TorrentInfo(torrentFilePath, TorrentParser.parseTorrent(torrentFilePath));
"torrent2.torrent";
TorrentInfo torrentInfo = new TorrentInfo(torrentFilePath, TorrentParser.parseTorrent(torrentFilePath));
System.out.println(torrentInfo);
System.out.println("--------------------------------------");
System.out.println("--------------------------------------");
System.out.println("--------------------------------------");
return torrentInfo;
}
}
6 changes: 6 additions & 0 deletions src/main/java/main/algorithms/PeersToPiecesMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,10 @@ public interface PeersToPiecesMapper {
* @return flux of pieces numbers.
*/
Flux<GroupedFlux<Integer, Link>> getLinksByAvailableMissingPiece$();

Flux<Integer> availablePieces$();

Flux<Link> linksForPiece$(int pieceIndex);

void dispose();
}
2 changes: 1 addition & 1 deletion src/main/java/main/algorithms/PiecesDownloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
import reactor.core.publisher.Mono;

public interface PiecesDownloader {
Mono<Integer> downloadPiece$(int pieceIndex);
Flux<Integer> getDownloadedPieces$();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@ public static BittorrentAlgorithm v1(AllocatorStore allocatorStore,
IncomingPeerMessagesNotifier incomingPeerMessagesNotifier,
Flux<Link> peersCommunicatorFlux,
String identifier) {
Flux<Link> recordedPeerFlux = peersCommunicatorFlux
.flatMap(peersCommunicator ->
peersCommunicator.sendMessages().sendInterestedMessage()
.map(sendPeerMessages -> peersCommunicator))
.replay()
.autoConnect();
Flux<Link> recordedPeerFlux = peersCommunicatorFlux.replay().autoConnect();

NotifyAboutCompletedPieceAlgorithm notifyAboutCompletedPieceAlgorithm =
new NotifyAboutCompletedPieceAlgorithmImpl(torrentInfo,
Expand All @@ -50,6 +45,7 @@ public static BittorrentAlgorithm v1(AllocatorStore allocatorStore,

PeersToPiecesMapper peersToPiecesMapper =
new PeersToPiecesMapperImpl(torrentInfo,
fileSystemLink,
incomingPeerMessagesNotifier,
recordedPeerFlux,
fileSystemLink.getUpdatedPiecesStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
import main.downloader.PieceEvent;
import main.file.system.FileSystemLink;
import main.peer.Link;
import main.peer.PeerExceptions;
import main.peer.SendMessagesNotifications;
import main.peer.peerMessages.RequestMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

public class BlockDownloaderImpl implements BlockDownloader {
private static Logger logger = LoggerFactory.getLogger(BlockDownloaderImpl.class);
private static Scheduler downloadBlockScheduler = Schedulers.newParallel("DOWNLOAD-BLOCK", 1);

private TorrentInfo torrentInfo;
private FileSystemLink fileSystemLink;
Expand Down Expand Up @@ -45,13 +50,15 @@ public Mono<PieceEvent> downloadBlock(Link link, RequestMessage requestMessage)
.limitRequest(1)
.single();

Mono<SendMessagesNotifications> sendRequestMessage$ = link.sendMessages()
.sendRequestMessage(requestMessage.getIndex(), requestMessage.getBegin(), requestMessage.getBlockLength());
Mono<SendMessagesNotifications> sendRequestMessage$ = link.sendMessages().sendRequestMessage(requestMessage);

return Mono.zip(savedPiece$, sendRequestMessage$, (pieceEvent, sendMessagesNotifications) -> pieceEvent)
.doOnSubscribe(__ -> logger.debug(this.identifier + " - start sending request message: " + requestMessage))
.subscribeOn(downloadBlockScheduler)
// TODO: there maybe a situation where we already got the piece but after the timeout so in the next time we request it, we won't see it in this implementation because we don't replay saved pieces from FS.
.timeout(Duration.ofMillis(2500))
.onErrorResume(PeerExceptions.isTimeoutException.and(__ -> fileSystemLink.havePiece(requestMessage.getIndex())), throwable -> Mono.error(new DownloadingSavedPieceException(requestMessage.getIndex())))
.doOnError(TimeoutException.class, throwable -> logger.debug(this.identifier + " - no response to the request: " + requestMessage))
.doOnNext(__ -> logger.debug(this.identifier + " - end sending request message: " + requestMessage));
.doOnNext(__ -> logger.debug(this.identifier + " - received block for request: " + requestMessage));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main.algorithms.impls.v1.download;

public class DownloadingSavedPieceException extends Exception{
private int completedPieceIndex;
public DownloadingSavedPieceException(int completedPieceIndex) {
this.completedPieceIndex=completedPieceIndex;
}

public int getCompletedPieceIndex() {
return completedPieceIndex;
}
}

0 comments on commit 9a639e7

Please sign in to comment.