22
33open System
44open System.IO
5+ open System.Net .Http
56open System.Security .Cryptography
67open System.Text
7-
88open System.Threading
9- open Emulsion. Settings
9+
1010open Serilog
11+ open SimpleBase
12+
13+ open Emulsion.Settings
1114
1215type DownloadRequest = {
1316 Uri: Uri
1417 CacheKey: string
1518 Size: uint64
1619}
1720
21+ module Base58 =
22+ /// Suggested by @ttldtor .
23+ let M4N71KR = Base58( Base58Alphabet " 123456789qwertyuiopasdfghjkzxcvbnmQWERTYUPASDFGHJKLZXCVBNM" )
24+
1825module FileCache =
19- let FileName ( sha256 : SHA256 , cacheKey : string ): string =
26+ let EncodeFileName ( sha256 : SHA256 , cacheKey : string ): string =
2027 cacheKey
2128 |> Encoding.UTF8.GetBytes
2229 |> sha256.ComputeHash
23- |> Convert.ToBase64String
30+ |> Base58.M4N71KR.Encode
31+
32+ let DecodeFileNameToSha256Hash ( fileName : string ): byte [] =
33+ ( Base58.M4N71KR.Decode fileName) .ToArray()
2434
25- // TODO: Total cache limit
2635type FileCache ( logger : ILogger ,
2736 settings: FileCacheSettings,
37+ httpClientFactory: IHttpClientFactory,
2838 sha256: SHA256) =
2939
3040 let getFilePath ( cacheKey : string ) =
31- Path.Combine( settings.Directory, FileCache.FileName ( sha256, cacheKey))
41+ Path.Combine( settings.Directory, FileCache.EncodeFileName ( sha256, cacheKey))
3242
3343 let getFromCache ( cacheKey : string ) = async {
3444 let path = getFilePath cacheKey
3545 return
3646 if File.Exists path then
37- Some( new FileStream( path, FileMode.Open, FileAccess.Read, FileShare.Delete))
47+ Some( new FileStream( path, FileMode.Open, FileAccess.Read, FileShare.Read ||| FileShare. Delete))
3848 else
3949 None
4050 }
4151
42- // TODO: Check total item size, too
52+ let assertCacheValid () = async {
53+ Directory.EnumerateFileSystemEntries settings.Directory
54+ |> Seq.iter( fun entry ->
55+ let entryName = Path.GetFileName entry
56+
57+ if not <| File.Exists entry
58+ then failwith $" Cache directory invalid: contains a subdirectory: \" {entryName}\" ."
59+
60+ let hash = FileCache.DecodeFileNameToSha256Hash entryName
61+ if hash.Length <> sha256.HashSize / 8
62+ then failwith (
63+ $" Cache directory invalid: contains entry \" {entryName}\" which doesn't correspond to a " +
64+ " base58-encoded SHA-256 hash."
65+ )
66+ )
67+ }
68+
4369 let ensureFreeCache size = async {
44- if size > settings.FileSizeLimitBytes then
70+ if size > settings.FileSizeLimitBytes || size > settings.TotalCacheSizeLimitBytes then
4571 return false
4672 else
47- return failwith " TODO: Sanity check that cache only has files"
73+ do ! assertCacheValid()
74+
75+ let allEntries =
76+ Directory.EnumerateFileSystemEntries settings.Directory
77+ |> Seq.map FileInfo
78+
79+ // Now, sort the entries from newest to oldest, and start deleting if required at a point when we understand
80+ // that there are too much files:
81+ let entriesByPriority =
82+ allEntries
83+ |> Seq.sortByDescending( fun info -> info.LastWriteTimeUtc)
84+ |> Seq.toArray
85+
86+ let mutable currentSize = 0 UL
87+ for info in entriesByPriority do
88+ currentSize <- currentSize + Checked.uint64 info.Length
89+ if currentSize + size > settings.TotalCacheSizeLimitBytes then
90+ logger.Information( " Deleting a cache item \" {FileName}\" ({Size} bytes)" , info.Name, info.Length)
91+ info.Delete()
92+
93+ return true
4894 }
4995
50- let download uri : Async < Stream > = async {
51- return failwithf " TODO: Download the URI and return a stream"
96+ let download ( uri : Uri ): Async < Stream > = async {
97+ let! ct = Async.CancellationToken
98+
99+ use client = httpClientFactory.CreateClient()
100+ let! response = Async.AwaitTask <| client.GetAsync( uri, ct)
101+ return ! Async.AwaitTask <| response.EnsureSuccessStatusCode() .Content.ReadAsStreamAsync()
52102 }
53103
54104 let downloadIntoCacheAndGet uri cacheKey : Async < Stream > = async {
@@ -57,41 +107,43 @@ type FileCache(logger: ILogger,
57107 let path = getFilePath cacheKey
58108 logger.Information( " Saving {Uri} to path {Path}…" , uri, path)
59109
60- use cachedFile = new FileStream( path, FileMode.Open, FileAccess.Write, FileShare.None)
61- do ! Async.AwaitTask( stream.CopyToAsync( cachedFile, ct))
62- logger.Information( " Download successful: \" {Uri}\" to \" {Path}\" ." )
110+ do ! async { // to limit the cachedFile scope
111+ use cachedFile = new FileStream( path, FileMode.CreateNew, FileAccess.Write, FileShare.None)
112+ do ! Async.AwaitTask( stream.CopyToAsync( cachedFile, ct))
113+ logger.Information( " Download successful: \" {Uri}\" to \" {Path}\" ." )
114+ }
63115
64116 let! file = getFromCache cacheKey
65117 return upcast Option.get file
66118 }
67119
68120 let cancellation = new CancellationTokenSource()
69- let processRequest request : Async < Stream option > = async {
121+ let processRequest request : Async < Stream > = async {
70122 logger.Information( " Cache lookup for content {Uri} (cache key {CacheKey})" , request.Uri, request.CacheKey)
71123 match ! getFromCache request.CacheKey with
72124 | Some content ->
73125 logger.Information( " Cache hit for content {Uri} (cache key {CacheKey})" , request.Uri, request.CacheKey)
74- return Some content
126+ return content
75127 | None ->
76128 logger.Information( " No cache hit for content {Uri} (cache key {CacheKey}), will download" , request.Uri, request.CacheKey)
77129 let! shouldCache = ensureFreeCache request.Size
78130 if shouldCache then
79131 logger.Information( " Resource {Uri} (cache key {CacheKey}, {Size} bytes) will fit into cache, caching" , request.Uri, request.CacheKey, request.Size)
80132 let! result = downloadIntoCacheAndGet request.Uri request.CacheKey
81133 logger.Information( " Resource {Uri} (cache key {CacheKey}, {Size} bytes) downloaded" , request.Uri, request.CacheKey, request.Size)
82- return Some result
134+ return result
83135 else
84136 logger.Information( " Resource {Uri} (cache key {CacheKey}) won't fit into cache, directly downloading" , request.Uri, request.CacheKey)
85137 let! result = download request.Uri
86- return Some result
138+ return result
87139 }
88140
89141 let rec processLoop ( processor : MailboxProcessor < _ * AsyncReplyChannel < _ >>) = async {
90142 while true do
91143 let! request , replyChannel = processor.Receive()
92144 try
93145 let! result = processRequest request
94- replyChannel.Reply result
146+ replyChannel.Reply( Some result)
95147 with
96148 | ex ->
97149 logger.Error( ex, " Exception while processing the file download queue" )
0 commit comments