package fireTester.communicator.client;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;

import org.mr.api.blocks.ScalableDispatcher;
import org.mr.api.blocks.ScalableFactory;
import org.mr.api.blocks.ScalableHandler;
import org.mr.api.blocks.ScalableStage;

import com.dragonsoft.tryapp.common.SubmissionObj;

import fireTester.interfaces.ClientController;
import fireTester.interfaces.SubmissionTest;
import fireTester.messages.AcknowledgeMessage;
import fireTester.messages.TestUnitResults;
import fireTester.messages.TesterException;
import fireTester.messages.UnknownException;
import fireTester.tests.batch.BatchCommand;
import fireTester.tests.batch.BatchReturnType;

/**
 * This is the main class for a distributed client.
 */
public class Client implements ScalableHandler, ClientController, Runnable {
	/**
	 *  Inbound stage, receives messages from the server.
	 */
	private ScalableStage clientStage;
	/**
	 *  Outbound dispatcher, sends messages to the server.
	 */ 
	public ScalableDispatcher serverDispatcher;
	
	private File workingDir;
	
	/**
	 * The queue of tests to be executed on this client box.
	 */
	public ArrayList currentTests;
	
	/**
	 * Init network.
	 * 
	 * @param workingDirName the working directory for this client.
	 */
	public Client(String workingDirName){
		this.workingDir = new File(workingDirName);
		assert(this.workingDir.exists());
		assert(this.workingDir.isDirectory());
		assert(this.workingDir.canRead());
		assert(this.workingDir.canWrite());
		
		this.currentTests = new ArrayList();
		
		this.clientStage = ScalableFactory.getStage("FireByDragonsoft_Clients", true);
		this.clientStage.addHandler(this);

		this.serverDispatcher = ScalableFactory.getDispatcher("FireByDragonsoft_Server", true);
		
		Thread t = new Thread(this);
		t.start();
		
		new PendingSubmissionKeepAlive(this);
	}
	
	/**
	 * This is an implementation method of ScalableHandler interface
	 * Called by the stage when a submission is queued in the server.
	 * 
	 * @param submissionTest the SubmissionTest to execute.
	 */
	public void handle(Object submissionTest) {
		System.out.println("handle called");
		
		assert submissionTest != null;
		assert submissionTest instanceof SubmissionTest;
		
		synchronized(this.currentTests) {
			this.currentTests.add(submissionTest);
		}
	}
	
	private void clearDir(File dir) {
		File[] files = dir.listFiles();
		for(int i = 0; i < files.length; i++) {
			files[i].delete();
		}
	}
	
	private void copySubmission(SubmissionObj s) throws Exception {
		assert s != null;
		
		List files = s.getSubmittedFiles();
		
		for(int i = 0; i < files.size(); i++) {
			File f = new File((String)files.get(i));
			File fOut = new File(this.workingDir.getAbsolutePath() + "/" + f.getName());
			copyFile(f, fOut);
		}
	}
	
	private void moveWorkingDirTo(File dir) {
		assert dir != null;
		dir.mkdirs();
		clearDir(dir);
		
		BatchCommand copy = new BatchCommand("rm -f *.fire; mv -f * " + dir.getAbsolutePath(), 5*1000, null, null);
		try {
			copy.execute(null, this.workingDir, 1/*GB*/*1000/*MB*/*1000/*KB*/*1024/*B*/, BatchReturnType.exitCode);
		} catch(Exception e) {
			// should never occur
			e.printStackTrace();
		}
	}
	
	 private void copyFile(File in, File out) throws Exception {
	     FileChannel sourceChannel = new
	          FileInputStream(in).getChannel();
	     FileChannel destinationChannel = new
	          FileOutputStream(out).getChannel();
	     sourceChannel.transferTo(0, sourceChannel.size(), destinationChannel);
	     // or
	     //  destinationChannel.transferFrom(sourceChannel, 0, sourceChannel.size());
	     sourceChannel.close();
	     destinationChannel.close();
	 }

	
	/**
	 * This is a non-blocking call that will send the TestUnitResults back to
	 * the server.
	 * 
	 * @param r the results to send
	 */
	public void post_unit_results(TestUnitResults r) {
		assert r != null;
		
		System.out.println("Sending unit results");
		
		this.serverDispatcher.dispatch(r);
	}
	
	/**
	 * Called to acknowledge the reciept of a new test.
	 * 
	 * @param ack the acknowledge object
	 */
	public void send_acknowledge(AcknowledgeMessage ack) {
		assert ack != null;
		
		System.out.println("Sending acknowledge");
		
		this.serverDispatcher.dispatch(ack);
	}
	
	/**
	 * Starts the program
	 * 
	 * @param args -WorkingDir=./
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		assert args.length == 1;
		
		new Client(args[0].substring(args[0].indexOf('=') + 1));		
	}

	/**
	 * @return the full path 
	 */
	public File get_working_directory() {
		return this.workingDir;
	}
	
	/**
	 * Execute tests as they are queued
	 */
	public void run() {
		while(true) {
			SubmissionTest test;
			
			synchronized(this.currentTests) {
				if(this.currentTests.size() > 0) {
					test = (SubmissionTest)this.currentTests.get(0);
					this.currentTests.remove(0);
				} else {
					test = null;
				}
			}
			
			if(test != null) {
				assert test.get_submission() != null;
				
				System.out.println("Processing job from " + test.get_submission().getStudentUsername());
				
				AcknowledgeMessage ack = new AcknowledgeMessage(test.get_submission());
				this.send_acknowledge(ack);
				
				try {
					clearDir(this.workingDir);
					copySubmission(test.get_submission());
					test.execute(this);
					moveWorkingDirTo(test.getSaveDir()); // move unless there's an exception
				} catch(TesterException e) {
					System.out.println("Sending exception " + e.toString());
					
					this.serverDispatcher.dispatch(e);
				} catch(Exception e) {
					e.printStackTrace();
					this.serverDispatcher.dispatch(new UnknownException(test.get_submission(), e));
				} finally {
					clearDir(this.workingDir);
				}
			} else {
				try {
					Thread.sleep(1000);
				} catch(InterruptedException e) {
					// ignore
				}
			}
		}
	}

}
