From 225694f964556a795da2faac950e3f5c12a16f4d Mon Sep 17 00:00:00 2001 From: pkanavos Date: Wed, 20 Jun 2012 23:50:49 +0300 Subject: [PATCH] Replaced Merkle hash with MD5 for change checking Removed initial file indexing Added hash progress feedback Changes to Selective Tree check behavior --- .../SelectiveSynch/DirectoryRecord.cs | 27 +- trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs | 4 +- trunk/Pithos.Core.Test/BlockUpdaterTest.cs | 19 + trunk/Pithos.Core/Agents/BlockExtensions.cs | 41 +- trunk/Pithos.Core/Agents/Downloader.cs | 12 +- trunk/Pithos.Core/Agents/FileAgent.cs | 2 +- trunk/Pithos.Core/Agents/NetworkAgent.cs | 1 - trunk/Pithos.Core/Agents/Notifier.cs | 1 + trunk/Pithos.Core/Agents/PollAgent.cs | 425 ++------------------ trunk/Pithos.Core/Agents/StateTuple.cs | 86 ++++ trunk/Pithos.Core/Agents/StatusAgent.cs | 123 +++--- trunk/Pithos.Core/Agents/Uploader.cs | 14 +- trunk/Pithos.Core/FileState.cs | 6 +- trunk/Pithos.Core/Pithos.Core.csproj | 1 + trunk/Pithos.Core/PithosMonitor.cs | 2 +- trunk/Pithos.Network/WebExtensions.cs | 14 + 16 files changed, 296 insertions(+), 482 deletions(-) create mode 100644 trunk/Pithos.Core.Test/BlockUpdaterTest.cs create mode 100644 trunk/Pithos.Core/Agents/StateTuple.cs diff --git a/trunk/Pithos.Client.WPF/SelectiveSynch/DirectoryRecord.cs b/trunk/Pithos.Client.WPF/SelectiveSynch/DirectoryRecord.cs index 88627db..681c9f1 100644 --- a/trunk/Pithos.Client.WPF/SelectiveSynch/DirectoryRecord.cs +++ b/trunk/Pithos.Client.WPF/SelectiveSynch/DirectoryRecord.cs @@ -71,7 +71,13 @@ namespace Pithos.Client.WPF.SelectiveSynch public Uri Uri { get; set; } //public DirectoryInfo LocalInfo { get; set; } - DirectoryRecord _parent; + + private DirectoryRecord _parent; + public DirectoryRecord Parent + { + get { return _parent; } + private set { _parent = value; } + } public bool Added { get; set; } public bool Removed { get; set; } @@ -106,13 +112,13 @@ namespace Pithos.Client.WPF.SelectiveSynch if (value == _isChecked) return; - _isChecked = value; + _isChecked = value??false; //If the value is null both Added and Removed should be False - Added = _isChecked??false; - Removed = !(_isChecked??true); + Added = _isChecked.Value; + Removed = !(_isChecked.Value); - if (updateChildren && _isChecked.HasValue) + if (updateChildren ) this.Directories.Apply(c => c.SetIsChecked(_isChecked, true, false)); if (updateParent && _parent != null) @@ -123,6 +129,8 @@ namespace Pithos.Client.WPF.SelectiveSynch void VerifyCheckState() { + RaisePropertyChangedEventImmediately("IsChecked"); + return; bool? state = null; for (var i = 0; i < this.Directories.Count(); ++i) { @@ -149,7 +157,14 @@ namespace Pithos.Client.WPF.SelectiveSynch public List Directories { get { return _directories; } - set { _directories= value; } + set + { + _directories= value; + foreach (var dir in value) + { + dir.Parent = this; + } + } } public DirectoryRecord() diff --git a/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs b/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs index b4110ce..4df7b46 100644 --- a/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs +++ b/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs @@ -47,6 +47,7 @@ using System.Net; using System.Reflection; using System.Runtime.InteropServices; using System.ServiceModel; +using System.Threading; using System.Threading.Tasks; using System.Windows; using System.Windows.Controls.Primitives; @@ -983,7 +984,7 @@ namespace Pithos.Client.WPF { public void Notify(Notification notification) { - _events.Publish(notification); + TaskEx.Run(()=> _events.Publish(notification)); } @@ -1054,7 +1055,6 @@ namespace Pithos.Client.WPF { var account = Accounts.FirstOrDefault(acc => acc.AccountKey == message.Account.AccountKey); if (account != null) { - _pollAgent.SetSelectivePaths(account, message.Added, message.Removed); var added=monitor.UrisToFilePaths(message.Added); _pollAgent.SynchNow(added); } diff --git a/trunk/Pithos.Core.Test/BlockUpdaterTest.cs b/trunk/Pithos.Core.Test/BlockUpdaterTest.cs new file mode 100644 index 0000000..e9635bb --- /dev/null +++ b/trunk/Pithos.Core.Test/BlockUpdaterTest.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using NUnit.Framework; +using Pithos.Core.Agents; + +namespace Pithos.Core.Test +{ + [TestFixture] + class BlockUpdaterTest + { + [Test] + public void TestLargeFile() + { + //var updater = new BlockUpdater(); + } + } +} diff --git a/trunk/Pithos.Core/Agents/BlockExtensions.cs b/trunk/Pithos.Core/Agents/BlockExtensions.cs index 773754a..ac38e3d 100644 --- a/trunk/Pithos.Core/Agents/BlockExtensions.cs +++ b/trunk/Pithos.Core/Agents/BlockExtensions.cs @@ -41,6 +41,7 @@ #endregion using System; using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.Contracts; using System.Linq; using System.Reflection; @@ -101,7 +102,7 @@ namespace Pithos.Core.Agents /// The file to hash /// The hash algorithm to use /// A hash value for the entire file. An empty string if the file does not exist. - public static string ComputeShortHash(this FileInfo info, HashAlgorithm hasher) + public static string ComputeShortHash(this FileInfo info, HashAlgorithm hasher,IStatusNotification notification) { if(info == null) throw new ArgumentNullException("info"); @@ -115,23 +116,49 @@ namespace Pithos.Core.Agents if (Log.IsDebugEnabled) Log.DebugFormat("Short Hashing [{0}] ",info.FullName); - using (var stream = info.Open(FileMode.Open, FileAccess.Read, FileShare.Read)) + var progress = new StatusNotification(""); + + using (var stream = new FileStream(info.FullName,FileMode.Open, FileAccess.Read, FileShare.Read,65536)) { - var hash = hasher.ComputeHash(stream); - var hashString = hash.ToHashString(); + var buffer = new byte[65536]; + int counter=0; + int bytesRead; + do + { + bytesRead = stream.Read(buffer, 0, 32768); + if (bytesRead > 0) + { + hasher.TransformBlock(buffer, 0, bytesRead, null, 0); + } + counter++; + if (counter % 100 == 0) + { + progress.Title = String.Format("Hashing {0:p} of {1}", stream.Position*1.0/stream.Length, + info.Name); + notification.Notify(progress); + } + } while (bytesRead > 0); + hasher.TransformFinalBlock(buffer, 0, 0); + var hash = hasher.Hash; + + progress.Title = String.Format("Hashed {0} ", info.Name); + notification.Notify(progress); + + var hashString = hash.ToHashString(); + return hashString; } } - public static string ComputeShortHash(this FileInfo info) + public static string ComputeShortHash(this FileInfo info,IStatusNotification notification) { if(info == null) throw new ArgumentNullException("info"); Contract.EndContractBlock(); - using (var hasher=HashAlgorithm.Create("sha1")) + using (var hasher=HashAlgorithm.Create("md5")) { - return ComputeShortHash(info,hasher); + return ComputeShortHash(info,hasher,notification); } } diff --git a/trunk/Pithos.Core/Agents/Downloader.cs b/trunk/Pithos.Core/Agents/Downloader.cs index e4b4968..d0419b8 100644 --- a/trunk/Pithos.Core/Agents/Downloader.cs +++ b/trunk/Pithos.Core/Agents/Downloader.cs @@ -35,7 +35,7 @@ namespace Pithos.Core.Agents //Download a file. - public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,TreeHash localTreeHash,CancellationToken cancellationToken) + public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); @@ -56,7 +56,15 @@ namespace Pithos.Core.Agents if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken).ConfigureAwait(false)) return; + + TreeHash localTreeHash; + using (StatusNotification.GetNotifier("Hashing for Download {0}", "Hashed for Download {0}", Path.GetFileName(filePath))) + { + localTreeHash = Signature.CalculateTreeHashAsync(filePath, + accountInfo.BlockSize, + accountInfo.BlockHash, 1); + } var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); @@ -323,7 +331,7 @@ namespace Pithos.Core.Agents return true; var info = new FileInfo(localPath); - var shortHash = info.ComputeShortHash(); + var shortHash = info.ComputeShortHash(StatusNotification); //If the file is different from the stored state, we have a change if (localState.ShortHash != shortHash) return true; diff --git a/trunk/Pithos.Core/Agents/FileAgent.cs b/trunk/Pithos.Core/Agents/FileAgent.cs index 16ccd24..a29ef4e 100644 --- a/trunk/Pithos.Core/Agents/FileAgent.cs +++ b/trunk/Pithos.Core/Agents/FileAgent.cs @@ -611,7 +611,7 @@ namespace Pithos.Core.Agents using (StatusNotification.GetNotifier("Hashing {0}", "Finished Hashing {0}", info.Name)) { - var shortHash = info.ComputeShortHash(); + var shortHash = info.ComputeShortHash(StatusNotification); string merkleHash = info.CalculateHash(StatusKeeper.BlockSize, StatusKeeper.BlockHash); StatusKeeper.UpdateFileChecksum(path, shortHash, merkleHash); diff --git a/trunk/Pithos.Core/Agents/NetworkAgent.cs b/trunk/Pithos.Core/Agents/NetworkAgent.cs index cf6ce2b..47e0ddc 100644 --- a/trunk/Pithos.Core/Agents/NetworkAgent.cs +++ b/trunk/Pithos.Core/Agents/NetworkAgent.cs @@ -121,7 +121,6 @@ namespace Pithos.Core.Agents //Essentially it stops the poll agent to give priority to the network agent //Initially the event is signalled because we don't need to pause private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true); - private Agents.Selectives _selectives; private bool _pause; public AsyncManualResetEvent ProceedEvent diff --git a/trunk/Pithos.Core/Agents/Notifier.cs b/trunk/Pithos.Core/Agents/Notifier.cs index fc90cde..6e3fa43 100644 --- a/trunk/Pithos.Core/Agents/Notifier.cs +++ b/trunk/Pithos.Core/Agents/Notifier.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading.Tasks; namespace Pithos.Core { diff --git a/trunk/Pithos.Core/Agents/PollAgent.cs b/trunk/Pithos.Core/Agents/PollAgent.cs index 57f3a90..e304b1e 100644 --- a/trunk/Pithos.Core/Agents/PollAgent.cs +++ b/trunk/Pithos.Core/Agents/PollAgent.cs @@ -68,70 +68,6 @@ namespace Pithos.Core.Agents public IEnumerable Batch { get; set; } }*/ - [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")] - public class StateTuple - { - public string FilePath { get; private set; } - - public string MD5 { get; set; } - - public string L - { - get { return FileState==null?null:FileState.Checksum; } - } - - private string _c; - public string C - { - get { return _c; } - set { - _c = String.IsNullOrWhiteSpace(value) ? null : value; - } - } - - public string S - { - get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; } - } - - private FileSystemInfo _fileInfo; - private TreeHash _merkle; - - public FileSystemInfo FileInfo - { - get { return _fileInfo; } - set - { - _fileInfo = value; - FilePath = value.FullName; - } - } - - public FileState FileState { get; set; } - public ObjectInfo ObjectInfo{ get; set; } - - - public TreeHash Merkle - { - get { - return _merkle; - } - set { - _merkle = value; - C = _merkle.TopHash.ToHashString(); - } - } - - public StateTuple() { } - - public StateTuple(FileSystemInfo info) - { - FileInfo = info; - } - - - } - /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all @@ -212,20 +148,6 @@ namespace Pithos.Core.Agents HashSet _knwonContainers = new HashSet(); - - /*private void ProcessPollRequest(PollRequest request) - { - - if (request.Since == null && request.Batch != null) - { - _batchQueue.Enqueue(request.Batch); - _syncEvent.Set(); - } - else - { - PollRemoteFiles(request.Since).Wait(); - } - }*/ /// /// Start a manual synchronization /// @@ -308,7 +230,7 @@ namespace Pithos.Core.Agents finally { //Ensure polling is scheduled even in case of error - PollRemoteFiles(nextSince); + TaskEx.Run(()=>PollRemoteFiles(nextSince)); //_pollAction.Post(new PollRequest {Since = nextSince}); } } @@ -446,8 +368,7 @@ namespace Pithos.Core.Agents await _unPauseEvent.WaitAsync().ConfigureAwait(false); //Get the local files here - var agent = AgentLocator.Get(accountInfo.AccountPath); - //TODO: Pass the batch here as well + var agent = AgentLocator.Get(accountInfo.AccountPath); var files = await LoadLocalFileTuples(accountInfo, accountBatch); var states = FileState.Queryable.ToList(); @@ -469,7 +390,7 @@ namespace Pithos.Core.Agents await _unPauseEvent.WaitAsync().ConfigureAwait(false); //Set the Merkle Hash - SetMerkleHash(accountInfo, tuple); + //SetMerkleHash(accountInfo, tuple); await SyncSingleItem(accountInfo, tuple, agent, token).ConfigureAwait(false); @@ -512,7 +433,7 @@ namespace Pithos.Core.Agents } else { - tuple.Merkle = TaskEx.Run(()=> Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash)).Result; + tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1); //tuple.C=tuple.Merkle.TopHash.ToHashString(); } } @@ -528,36 +449,31 @@ namespace Pithos.Core.Agents localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName)); //Use the queue to retry locked file hashing - var fileQueue = new Queue(localInfos); - var hasher = MD5.Create(); + var fileQueue = new ConcurrentQueue(localInfos); + var results = new List>(); var backoff = 0; while (fileQueue.Count > 0) { - var file = fileQueue.Dequeue(); + FileSystemInfo file; + fileQueue.TryDequeue(out file); using (ThreadContext.Stacks["File"].Push(file.FullName)) { - /* - Signature.CalculateTreeHash(file, accountInfo.BlockSize, - accountInfo.BlockHash). - TopHash.ToHashString() - */ try { //Replace MD5 here, do the calc while syncing individual files string hash ; if (file is DirectoryInfo) - hash = MERKLE_EMPTY; + hash = MD5_EMPTY; else { //Wait in case the FileAgent has requested a Pause await _unPauseEvent.WaitAsync().ConfigureAwait(false); - using (StatusNotification.GetNotifier("Hashing {0}", "Finished hashing {0}", file.Name)) - using (var stream = (file as FileInfo).OpenRead()) - { - hash = hasher.ComputeHash(stream).ToHashString(); + using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name)) + { + hash = ((FileInfo)file).ComputeShortHash(StatusNotification); backoff = 0; } } @@ -654,7 +570,8 @@ namespace Pithos.Core.Agents var targetPath = MoveForServerMove(accountInfo, tuple); StatusKeeper.SetFileState(targetPath, FileStatus.Modified,FileOverlayStatus.Modified, ""); - await NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath,tuple.Merkle, token) + + await NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, token) .ConfigureAwait(false); //updateRecord( L = S ) StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, @@ -695,9 +612,9 @@ namespace Pithos.Core.Agents var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, accountInfo.BlockSize, accountInfo.BlockHash, "Poll", isUnselectedRootFolder); - using (StatusNotification.GetNotifier("Uploading {0}", "Uploadied {0}", Path.GetFileName(localFilePath))) + using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}", Path.GetFileName(localFilePath))) { - await NetworkAgent.Uploader.UploadCloudFile(action, tuple.Merkle, token).ConfigureAwait(false); + await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); } //updateRecord( S = C ) @@ -809,9 +726,9 @@ namespace Pithos.Core.Agents foreach (var file in files) { var fsInfo = file.Item1; - var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2; + var fileHash = fsInfo is DirectoryInfo? MD5_EMPTY:file.Item2; - tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash}; + tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, C=fileHash,MD5 = fileHash}; } foreach (var state in states) { @@ -823,7 +740,8 @@ namespace Pithos.Core.Agents else { var fsInfo = FileInfoExtensions.FromPath(state.FilePath); - tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state}; + hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state}; + tuplesByPath[state.FilePath] = hashTuple; } } @@ -849,11 +767,12 @@ namespace Pithos.Core.Agents else { var fsInfo = FileInfoExtensions.FromPath(filePath); - var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; - tuplesByPath[filePath] = tuple; - tuplesByID[objectInfo.UUID] = tuple; + hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; + tuplesByPath[filePath] = hashTuple; + tuplesByID[objectInfo.UUID] = hashTuple; } } + Debug.Assert(tuplesByPath.Values.All(t => t.HashesValid())); return tuplesByPath.Values; } @@ -896,102 +815,12 @@ namespace Pithos.Core.Agents } readonly AccountsDifferencer _differencer = new AccountsDifferencer(); - private Dictionary> _selectiveUris = new Dictionary>(); private bool _pause; - private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - - /// - /// Deletes local files that are not found in the list of cloud files - /// - /// - /// - private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles) - { - if (accountInfo == null) - throw new ArgumentNullException("accountInfo"); - if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) - throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); - if (cloudFiles == null) - throw new ArgumentNullException("cloudFiles"); - Contract.EndContractBlock(); - - var deletedFiles = new List(); - foreach (var objectInfo in cloudFiles) - { - if (Log.IsDebugEnabled) - Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri); - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); - if (Log.IsDebugEnabled) - Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri); - if (item.Exists) - { - if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) - { - item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; - - } - - - Log.DebugFormat("Deleting {0}", item.FullName); - - var directory = item as DirectoryInfo; - if (directory != null) - directory.Delete(true); - else - item.Delete(); - Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri); - DateTime lastDate; - _lastSeen.TryRemove(item.FullName, out lastDate); - deletedFiles.Add(item); - } - StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted"); - } - Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count); - StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), - TraceLevel.Info); - - } - - private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable cloudFiles) - { -//Only consider files that are not being modified, ie they are in the Unchanged state - var deleteCandidates = FileState.Queryable.Where(state => - state.FilePath.StartsWith(accountInfo.AccountPath) - && state.FileStatus == FileStatus.Unchanged).ToList(); - - - //TODO: filesToDelete must take into account the Others container - var filesToDelete = (from deleteCandidate in deleteCandidates - let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) - let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) - where - !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) - select localFile).ToList(); + + const string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + const string MD5_EMPTY = "d41d8cd98f00b204e9800998ecf8427e"; - //Set the status of missing files to Conflict - foreach (var item in filesToDelete) - { - //Try to acquire a gate on the file, to take into account files that have been dequeued - //and are being processed - using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) - { - if (gate.Failed) - continue; - StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted, - "Local file missing from server"); - } - } - UpdateStatus(PithosStatus.HasConflicts); - StatusNotification.NotifyConflicts(filesToDelete, - String.Format( - "{0} local files are missing from Pithos, possibly because they were deleted", - filesToDelete.Count)); - StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), - TraceLevel.Info); - } - private void ReportConflictForMismatch(string localFilePath) { if (String.IsNullOrWhiteSpace(localFilePath)) @@ -1006,137 +835,6 @@ namespace Pithos.Core.Agents } - - /// - /// Creates a Sync action for each changed server file - /// - /// - /// - /// - private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) - { - if (changes == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); - - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in changes) - { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //If a directory object already exists, we may need to sync it - if (fileAgent.Exists(relativePath)) - { - var localFile = fileAgent.GetFileSystemInfo(relativePath); - //We don't need to sync directories - if (objectInfo.IsDirectory && localFile is DirectoryInfo) - continue; - using (new SessionScope(FlushAction.Never)) - { - var state = StatusKeeper.GetStateByFilePath(localFile.FullName); - _lastSeen[localFile.FullName] = DateTime.Now; - //Common files should be checked on a per-case basis to detect differences, which is newer - - yield return new CloudAction(accountInfo, CloudActionType.MustSynch, - localFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash,"Poll Changes"); - } - } - else - { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes"); - } - } - } - - /// - /// Creates a Local Move action for each moved server file - /// - /// - /// - /// - private IEnumerable MovesToActions(AccountInfo accountInfo, IEnumerable moves) - { - if (moves == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); - - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in moves) - { - var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName); - //If the previous file already exists, we can execute a Move operation - if (fileAgent.Exists(previousRelativepath)) - { - var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath); - using (new SessionScope(FlushAction.Never)) - { - var state = StatusKeeper.GetStateByFilePath(previousFile.FullName); - _lastSeen[previousFile.FullName] = DateTime.Now; - - //For each moved object we need to move both the local file and update - yield return new CloudAction(accountInfo, CloudActionType.RenameLocal, - previousFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash,"Poll Moves"); - //For modified files, we need to download the changes as well - if (objectInfo.X_Object_Hash != objectInfo.PreviousHash) - yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves"); - } - } - //If the previous file does not exist, we need to download it in the new location - else - { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves"); - } - } - } - - - /// - /// Creates a download action for each new server file - /// - /// - /// - /// - private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) - { - if (creates == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); - - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in creates) - { - if (Log.IsDebugEnabled) - Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri); - - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - - //If the object already exists, we should check before uploading or downloading - if (fileAgent.Exists(relativePath)) - { - var localFile= fileAgent.GetFileSystemInfo(relativePath); - var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName); - yield return new CloudAction(accountInfo, CloudActionType.MustSynch, - localFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash,"Poll Creates"); - } - else - { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates"); - } - - } - } - /// /// Notify the UI to update the visual status /// @@ -1182,73 +880,6 @@ namespace Pithos.Core.Agents SnapshotDifferencer differencer; _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); } - - public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed) - { - AbortRemovedPaths(accountInfo,removed); - //DownloadNewPaths(accountInfo,added); - } - -/* - private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added) - { - var client = new CloudFilesClient(accountInfo); - foreach (var folderUri in added) - { - try - { - - string account; - string container; - var segmentsCount = folderUri.Segments.Length; - //Is this an account URL? - if (segmentsCount < 3) - continue; - //Is this a container or folder URL? - if (segmentsCount == 3) - { - account = folderUri.Segments[1].TrimEnd('/'); - container = folderUri.Segments[2].TrimEnd('/'); - } - else - { - account = folderUri.Segments[2].TrimEnd('/'); - container = folderUri.Segments[3].TrimEnd('/'); - } - IList items; - if (segmentsCount > 3) - { - //List folder - var folder = String.Join("", folderUri.Segments.Splice(4)); - items = client.ListObjects(account, container, folder); - } - else - { - //List container - items = client.ListObjects(account, container); - } - var actions = CreatesToActions(accountInfo, items); - foreach (var action in actions) - { - NetworkAgent.Post(action); - } - } - catch (Exception exc) - { - Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc); - } - } - - //Need to get a listing of each of the URLs, then post them to the NetworkAgent - //CreatesToActions(accountInfo,) - -/* NetworkAgent.Post();#1# - } -*/ - - private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed) - { - /*this.NetworkAgent.*/ - } + } } diff --git a/trunk/Pithos.Core/Agents/StateTuple.cs b/trunk/Pithos.Core/Agents/StateTuple.cs new file mode 100644 index 0000000..948f029 --- /dev/null +++ b/trunk/Pithos.Core/Agents/StateTuple.cs @@ -0,0 +1,86 @@ +using System; +using System.Diagnostics; +using System.IO; +using Pithos.Interfaces; +using Pithos.Network; + +namespace Pithos.Core.Agents +{ + [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")] + public class StateTuple + { + public string FilePath { get; private set; } + + public string MD5 { get; set; } + + public string L + { + get + { + var hash = FileState.NullSafe(f => f.ShortHash); + return String.IsNullOrWhiteSpace(hash) ? null : hash; + } + } + + private string _c; + public string C + { + get { return _c; } + set { + _c = String.IsNullOrWhiteSpace(value) ? null : value; + } + } + + public string S + { + get + { + var etag = ObjectInfo.NullSafe(o => o.ETag); + return String.IsNullOrWhiteSpace(etag) ? null : etag; + } + } + + private FileSystemInfo _fileInfo; + private TreeHash _merkle; + + public FileSystemInfo FileInfo + { + get { return _fileInfo; } + set + { + _fileInfo = value; + FilePath = value.FullName; + } + } + + public FileState FileState { get; set; } + public ObjectInfo ObjectInfo{ get; set; } + + + public TreeHash Merkle + { + get { + return _merkle; + } + set { + _merkle = value; + C = _merkle.MD5; + } + } + + public StateTuple() { } + + public StateTuple(FileSystemInfo info) + { + FileInfo = info; + } + + public bool HashesValid() + { + return ( (C==null || C.Length == 32) + && (L==null || L.Length == 32) + && (S==null || S.Length == 32) + ); + } + } +} \ No newline at end of file diff --git a/trunk/Pithos.Core/Agents/StatusAgent.cs b/trunk/Pithos.Core/Agents/StatusAgent.cs index c65ccb6..595ccbe 100644 --- a/trunk/Pithos.Core/Agents/StatusAgent.cs +++ b/trunk/Pithos.Core/Agents/StatusAgent.cs @@ -220,88 +220,91 @@ namespace Pithos.Core.Agents { _persistenceAgent.Stop(); } - + public void ProcessExistingFiles(IEnumerable existingFiles) { - if(existingFiles ==null) + if (existingFiles == null) throw new ArgumentNullException("existingFiles"); Contract.EndContractBlock(); - + //Find new or matching files with a left join to the stored states - var fileStates = FileState.Queryable; - var currentFiles=from file in existingFiles - join state in fileStates on file.FullName.ToLower() equals state.FilePath.ToLower() into gs - from substate in gs.DefaultIfEmpty() - select new {File = file, State = substate}; + var fileStates = FileState.Queryable.ToList(); + var currentFiles = from file in existingFiles + join state in fileStates on file.FullName.ToLower() equals state.FilePath.ToLower() into + gs + from substate in gs.DefaultIfEmpty() + select Tuple.Create(file, substate); //To get the deleted files we must get the states that have no corresponding //files. //We can't use the File.Exists method inside a query, so we get all file paths from the states var statePaths = (from state in fileStates - select new {state.Id, state.FilePath}).ToList(); + select new {state.Id, state.FilePath}).ToList(); //and check each one - var missingStates= (from path in statePaths - where !File.Exists(path.FilePath) && !Directory.Exists(path.FilePath) - select path.Id).ToList(); + var missingStates = (from path in statePaths + where !File.Exists(path.FilePath) && !Directory.Exists(path.FilePath) + select path.Id).ToList(); //Finally, retrieve the states that correspond to the deleted files - var deletedFiles = from state in fileStates - where missingStates.Contains(state.Id) - select new { File = default(FileInfo), State = state }; + var deletedFiles = from state in fileStates + where missingStates.Contains(state.Id) + select Tuple.Create(default(FileInfo), state); var pairs = currentFiles.Union(deletedFiles).ToList(); - - using (var shortHasher = HashAlgorithm.Create("sha1")) + + i = 1; + var total = pairs.Count; + foreach (var pair in pairs) { - var i = 0; - var total = pairs.Count; - foreach (var pair in pairs) - { - using (StatusNotification.GetNotifier("Indexing file {0} of {1}", "Indexed file {0} of {1} ", ++i, total)) - { - var fileState = pair.State; - var file = pair.File; - if (fileState == null) - { - //This is a new file - var createState = FileState.CreateFor(file); - _persistenceAgent.Post(createState.Create); - } - else if (file == null) - { - //This file was deleted while we were down. We should mark it as deleted - //We have to go through UpdateStatus here because the state object we are using - //was created by a different ORM session. - _persistenceAgent.Post(() => UpdateStatusDirect(fileState.Id, FileStatus.Deleted)); - } - else - { - //This file has a matching state. Need to check for possible changes - //To check for changes, we use the cheap (in CPU terms) SHA1 algorithm - //on the entire file. + ProcessFile(total, pair); + } + } - var hashString = file.ComputeShortHash(shortHasher); + int i = 1; + private void ProcessFile(int total, Tuple pair) + { + var idx = Interlocked.Increment(ref i); + using (StatusNotification.GetNotifier("Indexing file {0} of {1}", "Indexed file {0} of {1} ", idx, total)) + { + var fileState = pair.Item2; + var file = pair.Item1; + if (fileState == null) + { + //This is a new file + var createState = FileState.CreateFor(file,StatusNotification); + _persistenceAgent.Post(createState.Create); + } + else if (file == null) + { + //This file was deleted while we were down. We should mark it as deleted + //We have to go through UpdateStatus here because the state object we are using + //was created by a different ORM session. + _persistenceAgent.Post(() => UpdateStatusDirect((Guid) fileState.Id, FileStatus.Deleted)); + } + else + { + //This file has a matching state. Need to check for possible changes + //To check for changes, we use the cheap (in CPU terms) MD5 algorithm + //on the entire file. + var hashString = file.ComputeShortHash(StatusNotification); + Debug.Assert(hashString.Length==32); - //TODO: Need a way to attach the hashes to the filestate so we don't - //recalculate them each time a call to calculate has is made - //We can either store them to the filestate or add them to a - //dictionary - //If the hashes don't match the file was changed - if (fileState.ShortHash != hashString) - { - _persistenceAgent.Post(() => UpdateStatusDirect(fileState.Id, FileStatus.Modified)); - } - } + //TODO: Need a way to attach the hashes to the filestate so we don't + //recalculate them each time a call to calculate has is made + //We can either store them to the filestate or add them to a + //dictionary + + //If the hashes don't match the file was changed + if (fileState.ShortHash != hashString) + { + _persistenceAgent.Post(() => UpdateStatusDirect((Guid) fileState.Id, FileStatus.Modified)); } } } - - } - private int UpdateStatusDirect(Guid id, FileStatus status) @@ -359,7 +362,7 @@ namespace Pithos.Core.Agents var affected = command.ExecuteNonQuery(); if (affected == 0) { - var createdState = FileState.CreateFor(FileInfoExtensions.FromPath(path)); + var createdState = FileState.CreateFor(FileInfoExtensions.FromPath(path), StatusNotification); createdState.FileStatus = status; createdState.Create(); } @@ -399,7 +402,7 @@ namespace Pithos.Core.Agents var affected = command.ExecuteNonQuery(); if (affected == 0) { - var createdState=FileState.CreateFor(FileInfoExtensions.FromPath(absolutePath)); + var createdState = FileState.CreateFor(FileInfoExtensions.FromPath(absolutePath), StatusNotification); createdState.FileStatus = fileStatus; createdState.OverlayStatus = overlayStatus; createdState.ConflictReason = conflictReason; @@ -811,7 +814,7 @@ namespace Pithos.Core.Agents var fileInfo = FileInfoExtensions.FromPath(path); using (new SessionScope()) { - var newState = FileState.CreateFor(fileInfo); + var newState = FileState.CreateFor(fileInfo,StatusNotification); newState.FileStatus=FileStatus.Missing; _persistenceAgent.PostAndAwait(newState.CreateAndFlush).Wait(); } diff --git a/trunk/Pithos.Core/Agents/Uploader.cs b/trunk/Pithos.Core/Agents/Uploader.cs index 6a02871..b8e1ec7 100644 --- a/trunk/Pithos.Core/Agents/Uploader.cs +++ b/trunk/Pithos.Core/Agents/Uploader.cs @@ -33,7 +33,7 @@ namespace Pithos.Core.Agents _cts.Cancel(); }*/ - public async Task UploadCloudFile(CloudUploadAction action,TreeHash localTreeHash,CancellationToken cancellationToken) + public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken) { if (action == null) throw new ArgumentNullException("action"); @@ -47,6 +47,15 @@ namespace Pithos.Core.Agents var fileInfo = action.LocalFile; + + TreeHash localTreeHash; + using (StatusNotification.GetNotifier("Hashing for Upload {0}", "Hashed for Upload {0}", fileInfo.Name)) + { + localTreeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, + action.AccountInfo.BlockSize, + action.AccountInfo.BlockHash, 1); + } + if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) return; @@ -296,7 +305,7 @@ namespace Pithos.Core.Agents var client = new CloudFilesClient(accountInfo); //Send the hashmap to the server var missingHashes = await client.PutHashMap(account, container, url, treeHash).ConfigureAwait(false); - ReportUploadProgress(fileInfo.Name, block++, 0, missingHashes.Count, fileInfo.Length); + ReportUploadProgress(fileInfo.Name, block, 0, missingHashes.Count, fileInfo.Length); //If the server returns no missing hashes, we are done client.UploadProgressChanged += (sender, args) => @@ -306,6 +315,7 @@ namespace Pithos.Core.Agents while (missingHashes.Count > 0) { + block = 0; if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false)) return; diff --git a/trunk/Pithos.Core/FileState.cs b/trunk/Pithos.Core/FileState.cs index 833881c..3995cac 100644 --- a/trunk/Pithos.Core/FileState.cs +++ b/trunk/Pithos.Core/FileState.cs @@ -435,7 +435,7 @@ namespace Pithos.Core }, null); } - public static FileState CreateFor(FileSystemInfo info) + public static FileState CreateFor(FileSystemInfo info,IStatusNotification notification) { if(info==null) throw new ArgumentNullException("info"); @@ -451,8 +451,8 @@ namespace Pithos.Core Id = Guid.NewGuid() }; - - var shortHash = ((FileInfo)info).ComputeShortHash(); + + var shortHash = ((FileInfo)info).ComputeShortHash(notification); var fileState = new FileState { FilePath = info.FullName, diff --git a/trunk/Pithos.Core/Pithos.Core.csproj b/trunk/Pithos.Core/Pithos.Core.csproj index 7542e01..31436d3 100644 --- a/trunk/Pithos.Core/Pithos.Core.csproj +++ b/trunk/Pithos.Core/Pithos.Core.csproj @@ -403,6 +403,7 @@ + diff --git a/trunk/Pithos.Core/PithosMonitor.cs b/trunk/Pithos.Core/PithosMonitor.cs index 589dc33..1bc522c 100644 --- a/trunk/Pithos.Core/PithosMonitor.cs +++ b/trunk/Pithos.Core/PithosMonitor.cs @@ -254,7 +254,7 @@ namespace Pithos.Core LoadSelectivePaths(); - await IndexLocalFiles().ConfigureAwait(false); + //await IndexLocalFiles().ConfigureAwait(false); StartWatcherAgent(); diff --git a/trunk/Pithos.Network/WebExtensions.cs b/trunk/Pithos.Network/WebExtensions.cs index 6676d65..53612d6 100644 --- a/trunk/Pithos.Network/WebExtensions.cs +++ b/trunk/Pithos.Network/WebExtensions.cs @@ -10,6 +10,7 @@ namespace Pithos.Network { public static class WebExtensions { + public static string ReadToEnd(this HttpWebResponse response) { using (var stream = response.GetResponseStream()) @@ -49,5 +50,18 @@ namespace Pithos.Network return new StringReader(body); } } + + + public static TOut NullSafe(this TIn obj, Func memberAction) + { + //Note we should not use obj != null because it can not test value types and also + //compiler has to lift the type to a nullable type for doing the comparision with null. + return (!EqualityComparer.Default.Equals(obj, default(TIn))) ? memberAction(obj) : default(TOut); + + } + + + + } } -- 1.7.10.4