什么是 folly::Future? folly::Future 是 Meta 开源的 C++ 库 Folly 中提供的一个异步编程工具。它类似于其他语言中的 Future 或 Promise 概念,用于表示一个可能在未来某个时间点完成的异步操作的结果。通过使用 folly::Future,开发者可以更方便地处理异步任务,避免回调地狱,提高代码的可读性和维护性。
folly::Future 相对于标准库中的 std::future,提供了更多的功能和更灵活的接口,例如链式调用、错误处理等。
基本用法 1. 创建 Future 通过 Promise 创建 Future 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 folly::Future<int > asyncOperation () { folly::Promise<int > promise; auto future = promise.getFuture (); std::thread ([p = std::move (promise)]() mutable { std::this_thread::sleep_for (std::chrono::seconds (1 )); if (rand () % 2 == 0 ) { p.setValue (42 ); } else { p.setException (std::domain_error ("Simulated error" )); } }).detach (); return future; }
创建已完成的 Future 1 2 3 4 5 6 folly::Future<int > completedFuture () { return folly::makeFuture (100 ); } folly::Future<int > failedFuture () { return folly::makeFuture <int >(std::runtime_error ("Error occurred" )); }
2. 等待 Future 阻塞等待 Future 完成并获取结果 1 2 3 4 5 6 7 8 9 void waitForFutureResult () { auto fut = asyncOperation (); try { int result = std::move (fut).get (); std::cout << "Future completed with value: " << result << std::endl; } catch (const std::exception& e) { std::cerr << "Future failed with error: " << e.what () << std::endl; } }
阻塞等待 Future 完成 1 2 3 4 5 6 7 8 9 void waitForFutureFinish () { auto fut = asyncOperation (); try { std::move (fut).wait (); std::cout << "Future completed" << std::endl; } catch (const std::exception& e) { std::cerr << "Future failed with error: " << e.what () << std::endl; } }
3. 回调函数 在 Future 上追加回调函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 folly::Future<int > registerCallback () { auto fut = asyncOperation (); auto fut2 = std::move (fut) .thenValue ([](int value) { std::cout << "Future completed with value: " << value << std::endl; return value * 2 ; }) .thenError <std::domain_error>([](std::domain_error &&e) { std::cerr << "Future failed with domain_error: " << e.what () << std::endl; return -1 ; }) .thenError <std::exception>([](std::exception &&e) { std::cerr << "Future failed with exception: " << e.what () << std::endl; return -1 ; }); return fut2; }
追加多个回调函数,实现链式调用
1 2 3 4 5 6 7 8 9 10 11 12 13 void chainFutures () { asyncOperation () .thenValue ([](int value) { return value * 2 ; }) .thenValue ([](int value) { return std::to_string (value); }) .thenValue ([](const std::string &str) { std::cout << "Final result: " << str << std::endl; }) .wait (); }
控制回调函数的执行线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void futureWithExecutor () { auto main_executor = folly::getGlobalCPUExecutor ().get (); folly::CPUThreadPoolExecutor pool_executor (4 ) ; asyncOperation () .via (main_executor) .thenValue ([](int value) { std::cout << "Callback running on main_executor thread, value: " << value << std::endl; return value + 1 ; }) .via (&pool_executor) .thenValue ([](int value) { std::cout << "Callback running on pool_executor thread, value: " << value << std::endl; return value + 1 ; }) .wait (); }
4. 组合多个 Future 等待多个 Future 完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void combineFutures () { auto fut1 = asyncOperation (); auto fut2 = completedFuture (); auto fut3 = failedFuture (); auto combinedFuture = folly::collectAll (std::array<folly::Future<int >, 3 >{std::move (fut1), std::move (fut2), std::move (fut3)}) .via (folly::getGlobalCPUExecutor ().get ()) .thenValue ([](auto &&results) { int sum = 0 ; for (folly::Try<int > &result : results) { if (result.hasValue ()) { sum += result.value (); } if (result.hasException ()) { std::cerr << "One of the futures failed: " << result.exception ().what () << std::endl; } } return sum; }) .thenValue ([](int sum) { std::cout << "Sum of successful futures: " << sum << std::endl; return sum; }); combinedFuture.wait (); }
等待第一个完成的 Future 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void raceFutures () { auto fut1 = asyncOperation (); auto fut2 = completedFuture (); auto fut3 = failedFuture (); auto raceFuture = folly::collectAny (std::array<folly::Future<int >, 3 >{std::move (fut1), std::move (fut2), std::move (fut3)}) .via (folly::getGlobalCPUExecutor ().get ()) .thenValue ([](std::pair<size_t , folly::Try<int >> result) { if (result.second.hasValue ()) { std::cout << "First completed future value: " << result.second.value () << std::endl; } else if (result.second.hasException ()) { std::cerr << "First completed future failed: " << result.second.exception ().what () << std::endl; } return result; }); raceFuture.wait (); auto fut4 = failedFuture (); auto raceFuture2 = folly::collectAny (std::array<folly::Future<int >, 1 >{std::move (fut4)}) .via (folly::getGlobalCPUExecutor ().get ()) .thenValue ([](std::pair<size_t , folly::Try<int >> result) { if (result.second.hasValue ()) { std::cout << "First completed future value: " << result.second.value () << std::endl; } else if (result.second.hasException ()) { std::cerr << "First completed future failed: " << result.second.exception ().what () << std::endl; } return result; }); raceFuture2. wait (); }
收集成功的 Future 结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void collectSuccessfulFutures () { auto fut1 = asyncOperation (); auto fut2 = completedFuture (); auto fut3 = failedFuture (); auto collectedFuture = folly::collect (std::array<folly::Future<int >, 3 >{std::move (fut1), std::move (fut2), std::move (fut3)}) .via (folly::getGlobalCPUExecutor ().get ()) .thenValue ([](std::vector<int > results) { int sum = 0 ; for (int value : results) { sum += value; } std::cout << "Sum of successful futures: " << sum << std::endl; return sum; }); collectedFuture.wait (); }
5. 复杂控制流 循环重试直到成功 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void retryUntilSuccess (std::function<folly::Future<int >()> operation, int maxRetries) { std::function<folly::Future<int >(int )> attempt = [&](int retriesLeft) -> folly::Future<int > { return operation ().thenError <std::exception>([&](std::exception &&e) { if (retriesLeft > 0 ) { std::cout << "Operation failed: " << e.what () << ", retries left: " << retriesLeft << std::endl; return attempt (retriesLeft - 1 ); } else { std::cerr << "Operation failed after max retries: " << e.what () << std::endl; throw ; } }); }; attempt (maxRetries).wait (); }
条件循环执行异步操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void useWhileDo () { using namespace std::chrono_literals; const auto start = std::chrono::steady_clock::now (); std::atomic<bool > done{false }; folly::whileDo([&]() { return !done && std::chrono::steady_clock::now () - start < 5 s; }, [&]() { return asyncOperation () .thenValue ([&](int value) { done = true ; std::cout << "Success value: " << value << std::endl << "Elapsed time: " << std::chrono::duration_cast <std::chrono::seconds>( std::chrono::steady_clock::now () - start) .count () << " seconds" << std::endl; }) .thenError <std::exception>([&](std::exception &&e) { std::cerr << "Operation failed: " << e.what () << std::endl; return folly::futures::sleep (500 ms); }); }) .then ([&](auto &&) { if (!done) { std::cout << "Operation did not succeed within timeout 5 seconds." << std::endl; } }) .wait (); }