Line data Source code
1 : //
2 : // Copyright (c) 2019 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2025 Mohammad Nejati
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/http
9 : //
10 :
11 : #ifndef BOOST_HTTP_SERIALIZER_HPP
12 : #define BOOST_HTTP_SERIALIZER_HPP
13 :
14 : #include <boost/http/config.hpp>
15 : #include <boost/http/detail/workspace.hpp>
16 : #include <boost/http/error.hpp>
17 :
18 : #include <boost/capy/buffers.hpp>
19 : #include <boost/capy/buffers/buffer_pair.hpp>
20 : #include <boost/capy/concept/buffer_sink.hpp>
21 : #include <boost/capy/concept/write_stream.hpp>
22 : #include <boost/capy/io_task.hpp>
23 : #include <boost/core/span.hpp>
24 : #include <boost/system/result.hpp>
25 :
26 : #include <cstddef>
27 : #include <cstring>
28 : #include <type_traits>
29 : #include <utility>
30 :
31 : namespace boost {
32 : namespace http {
33 :
34 : // Forward declaration
35 : class message_base;
36 :
37 : //------------------------------------------------
38 :
39 : /** A serializer for HTTP/1 messages.
40 :
41 : Transforms one or more HTTP/1 messages into bytes for
42 : transmission. Each message consists of a required header
43 : followed by an optional body.
44 :
45 : Use @ref set_message to associate a message, then choose
46 : a body mode:
47 :
48 : @li @ref start — empty body (header only)
49 : @li @ref start_writes — body via internal buffer
50 : (BufferSink path)
51 : @li @ref start_buffers — body via caller-owned buffers
52 : (WriteSink path)
53 :
54 : Alternatively, obtain a @ref sink via @ref sink_for and
55 : let it start the serializer lazily on first use.
56 :
57 : The caller must ensure that the associated message is not
58 : changed or destroyed until @ref is_done returns true,
59 : @ref reset is called, or the serializer is destroyed.
60 :
61 : @par Example
62 : @code
63 : http::serializer sr(cfg);
64 : http::response res;
65 : res.set_payload_size(5);
66 : sr.set_message(res);
67 :
68 : auto sink = sr.sink_for(socket);
69 : co_await sink.write_eof(
70 : capy::make_buffer(std::string_view("hello")));
71 : @endcode
72 :
73 : @see @ref sink, @ref set_message.
74 : */
75 : class serializer
76 : {
77 : public:
78 : template<capy::WriteStream Stream>
79 : class sink;
80 :
81 : /** The type used to represent a sequence
82 : of mutable buffers for streaming.
83 : */
84 : using mutable_buffers_type =
85 : capy::mutable_buffer_pair;
86 :
87 : /** The type used to represent a sequence of
88 : constant buffers that refers to the output
89 : area.
90 : */
91 : using const_buffers_type =
92 : boost::span<capy::const_buffer const>;
93 :
94 : /** Destructor
95 : */
96 : BOOST_HTTP_DECL
97 : ~serializer();
98 :
99 : /** Default constructor.
100 :
101 : Constructs a serializer with no allocated state.
102 : The serializer must be assigned from a valid
103 : serializer before use.
104 :
105 : @par Postconditions
106 : The serializer has no allocated state.
107 : */
108 2 : serializer() = default;
109 :
110 : /** Constructor.
111 :
112 : Constructs a serializer with the provided configuration.
113 :
114 : @par Postconditions
115 : @code
116 : this->is_done() == true
117 : @endcode
118 :
119 : @param cfg Shared pointer to serializer configuration.
120 :
121 : @see @ref make_serializer_config, @ref serializer_config.
122 : */
123 : BOOST_HTTP_DECL
124 : explicit
125 : serializer(
126 : std::shared_ptr<serializer_config_impl const> cfg);
127 :
128 : /** Constructor.
129 :
130 : The states of `other` are transferred
131 : to the newly constructed object,
132 : which includes the allocated buffer.
133 : After construction, the only valid
134 : operations on the moved-from object
135 : are destruction and assignment.
136 :
137 : Buffer sequences previously obtained
138 : using @ref prepare remain valid.
139 :
140 : @par Postconditions
141 : @code
142 : other.is_done() == true
143 : @endcode
144 :
145 : @par Complexity
146 : Constant.
147 :
148 : @param other The serializer to move from.
149 : */
150 : BOOST_HTTP_DECL
151 : serializer(
152 : serializer&& other) noexcept;
153 :
154 : /** Assignment.
155 : The states of `other` are transferred
156 : to this object, which includes the
157 : allocated buffer. After assignment,
158 : the only valid operations on the
159 : moved-from object are destruction and
160 : assignment.
161 : Buffer sequences previously obtained
162 : using @ref prepare remain valid.
163 : @par Complexity
164 : Constant.
165 : @param other The serializer to move from.
166 : @return A reference to this object.
167 : */
168 : BOOST_HTTP_DECL
169 : serializer&
170 : operator=(serializer&& other) noexcept;
171 :
172 : /** Reset the serializer for a new message.
173 :
174 : Aborts any ongoing serialization and
175 : prepares the serializer to start
176 : serialization of a new message.
177 : */
178 : BOOST_HTTP_DECL
179 : void
180 : reset() noexcept;
181 :
182 : /** Set the message to serialize.
183 :
184 : Associates a message with the serializer for subsequent
185 : streaming operations. The message is not copied; the caller
186 : must ensure it remains valid until serialization completes.
187 :
188 : @param m The message to associate.
189 : */
190 : BOOST_HTTP_DECL
191 : void
192 : set_message(message_base const& m) noexcept;
193 :
194 : /** Start serializing the associated message with an empty body.
195 :
196 : The message must be set beforehand using @ref set_message.
197 : Use the prepare/consume loop to pull output bytes.
198 :
199 : @par Preconditions
200 : A message was associated via @ref set_message.
201 :
202 : @par Exception Safety
203 : Strong guarantee.
204 :
205 : @throw std::logic_error if no message is associated or
206 : `this->is_done() == false`.
207 :
208 : @throw std::length_error if there is insufficient internal buffer
209 : space to start the operation.
210 :
211 : @see @ref set_message, @ref prepare, @ref consume.
212 : */
213 : void
214 : BOOST_HTTP_DECL
215 : start();
216 :
217 : /** Start streaming the associated message.
218 :
219 : Low-level entry point equivalent to @ref start_writes.
220 : Prefer using a @ref sink which starts lazily.
221 :
222 : @par Preconditions
223 : A message was associated via @ref set_message.
224 :
225 : @par Exception Safety
226 : Strong guarantee.
227 :
228 : @throw std::logic_error if no message is associated or
229 : `this->is_done() == false`.
230 :
231 : @throw std::length_error if there is insufficient internal buffer
232 : space to start the operation.
233 :
234 : @see @ref start_writes, @ref sink.
235 : */
236 : BOOST_HTTP_DECL
237 : void
238 : start_stream();
239 :
240 : /** Start the serializer in write mode.
241 :
242 : Prepares the serializer for write-mode streaming
243 : using the message previously set via @ref set_message.
244 : In this mode, the workspace is split into an input
245 : buffer and an output buffer. Use @ref stream_prepare,
246 : @ref stream_commit, and @ref stream_close to write
247 : body data, or use the sink's BufferSink interface.
248 :
249 : @par Preconditions
250 : A message was associated via @ref set_message.
251 : @code
252 : this->is_done() == true
253 : @endcode
254 :
255 : @par Exception Safety
256 : Strong guarantee.
257 :
258 : @throw std::logic_error if no message is associated.
259 :
260 : @throw std::length_error if there is insufficient internal buffer
261 : space to start the operation.
262 :
263 : @see @ref set_message, @ref sink.
264 : */
265 : BOOST_HTTP_DECL
266 : void
267 : start_writes();
268 :
269 : /** Start the serializer in buffer mode.
270 :
271 : Prepares the serializer for buffer-mode streaming
272 : using the message previously set via @ref set_message.
273 : In this mode, the entire workspace is used for output
274 : buffering. The caller provides body data through the
275 : sink's WriteSink methods (write, write_eof), passing
276 : their own buffers directly.
277 :
278 : @par Preconditions
279 : A message was associated via @ref set_message.
280 : @code
281 : this->is_done() == true
282 : @endcode
283 :
284 : @par Exception Safety
285 : Strong guarantee.
286 :
287 : @throw std::logic_error if no message is associated.
288 :
289 : @throw std::length_error if there is insufficient internal buffer
290 : space to start the operation.
291 :
292 : @see @ref set_message, @ref sink.
293 : */
294 : BOOST_HTTP_DECL
295 : void
296 : start_buffers();
297 :
298 : /** Create a sink for writing body data.
299 :
300 : Returns a lightweight @ref sink handle that writes
301 : serialized body data to the provided stream. The sink
302 : starts the serializer lazily on first use, so neither
303 : @ref start_writes nor @ref start_buffers need to be
304 : called beforehand.
305 :
306 : The sink can be created once and reused across multiple
307 : messages. The serializer must outlive the sink.
308 :
309 : @par Example
310 : @code
311 : http::serializer sr(cfg);
312 : auto sink = sr.sink_for(socket);
313 :
314 : http::response res;
315 : res.set_payload_size(5);
316 : sr.set_message(res);
317 : co_await sink.write_eof(
318 : capy::make_buffer(std::string_view("hello")));
319 : @endcode
320 :
321 : @tparam Stream The output stream type satisfying
322 : @ref capy::WriteStream.
323 :
324 : @param ws The output stream to write serialized data to.
325 :
326 : @return A @ref sink object for writing body data.
327 :
328 : @see @ref sink, @ref set_message.
329 : */
330 : template<capy::WriteStream Stream>
331 : sink<Stream>
332 : sink_for(Stream& ws) noexcept;
333 :
334 : /** Return the output area.
335 :
336 : This function serializes some or all of
337 : the message and returns the corresponding
338 : output buffers. Afterward, a call to @ref
339 : consume is required to report the number
340 : of bytes used, if any.
341 :
342 : If the message includes an
343 : `Expect: 100-continue` header and the
344 : header section of the message has been
345 : consumed, the returned result will contain
346 : @ref error::expect_100_continue to
347 : indicate that the header part of the
348 : message is complete. The next call to @ref
349 : prepare will produce output.
350 :
351 : When the serializer is in streaming mode,
352 : the result may contain @ref error::need_data
353 : to indicate that additional input is required
354 : to produce output.
355 :
356 : @par Preconditions
357 : @code
358 : this->is_done() == false
359 : @endcode
360 : No unrecoverable error reported from previous calls.
361 :
362 : @par Exception Safety
363 : Strong guarantee.
364 :
365 : @throw std::logic_error
366 : `this->is_done() == true`.
367 :
368 : @return A result containing @ref
369 : const_buffers_type that represents the
370 : output area or an error if any occurred.
371 :
372 : @see
373 : @ref consume,
374 : @ref is_done,
375 : @ref const_buffers_type.
376 : */
377 : BOOST_HTTP_DECL
378 : auto
379 : prepare() ->
380 : system::result<
381 : const_buffers_type>;
382 :
383 : /** Consume bytes from the output area.
384 :
385 : This function should be called after one
386 : or more bytes contained in the buffers
387 : provided in the prior call to @ref prepare
388 : have been used.
389 :
390 : After a call to @ref consume, callers
391 : should check the return value of @ref
392 : is_done to determine if the entire message
393 : has been serialized.
394 :
395 : @par Preconditions
396 : @code
397 : this->is_done() == false
398 : @endcode
399 :
400 : @par Exception Safety
401 : Strong guarantee.
402 :
403 : @throw std::logic_error
404 : `this->is_done() == true`.
405 :
406 : @param n The number of bytes to consume.
407 : If `n` is greater than the size of the
408 : buffer returned from @ref prepared the
409 : entire output sequence is consumed and no
410 : error is issued.
411 :
412 : @see
413 : @ref prepare,
414 : @ref is_done,
415 : @ref const_buffers_type.
416 : */
417 : BOOST_HTTP_DECL
418 : void
419 : consume(std::size_t n);
420 :
421 : /** Return true if serialization is complete.
422 : */
423 : BOOST_HTTP_DECL
424 : bool
425 : is_done() const noexcept;
426 :
427 : /** Return true if serialization has not yet started.
428 : */
429 : BOOST_HTTP_DECL
430 : bool
431 : is_start() const noexcept;
432 :
433 : /** Return the available capacity for streaming.
434 :
435 : Returns the number of bytes that can be written
436 : to the serializer's internal buffer.
437 :
438 : @par Preconditions
439 : The serializer is in streaming mode (after calling
440 : @ref start_stream).
441 :
442 : @par Exception Safety
443 : Strong guarantee.
444 :
445 : @throw std::logic_error if not in streaming mode.
446 : */
447 : BOOST_HTTP_DECL
448 : std::size_t
449 : stream_capacity() const;
450 :
451 : /** Prepare a buffer for writing stream data.
452 :
453 : Returns a mutable buffer sequence representing
454 : the writable bytes. Use @ref stream_commit to make the
455 : written data available to the serializer.
456 :
457 : All buffer sequences previously obtained
458 : using @ref stream_prepare are invalidated.
459 :
460 : @par Preconditions
461 : The serializer is in streaming mode.
462 :
463 : @par Exception Safety
464 : Strong guarantee.
465 :
466 : @return An instance of @ref mutable_buffers_type.
467 : The underlying memory is owned by the serializer.
468 :
469 : @throw std::logic_error if not in streaming mode.
470 :
471 : @see
472 : @ref stream_commit,
473 : @ref stream_capacity.
474 : */
475 : BOOST_HTTP_DECL
476 : mutable_buffers_type
477 : stream_prepare();
478 :
479 : /** Commit data to the serializer stream.
480 :
481 : Makes `n` bytes available to the serializer.
482 :
483 : All buffer sequences previously obtained
484 : using @ref stream_prepare are invalidated.
485 :
486 : @par Preconditions
487 : The serializer is in streaming mode and
488 : `n <= stream_capacity()`.
489 :
490 : @par Exception Safety
491 : Strong guarantee.
492 : Exceptions thrown on invalid input.
493 :
494 : @param n The number of bytes to commit.
495 :
496 : @throw std::invalid_argument if `n > stream_capacity()`.
497 :
498 : @throw std::logic_error if not in streaming mode.
499 :
500 : @see
501 : @ref stream_prepare,
502 : @ref stream_capacity.
503 : */
504 : BOOST_HTTP_DECL
505 : void
506 : stream_commit(std::size_t n);
507 :
508 : /** Close the stream.
509 :
510 : Notifies the serializer that the message body
511 : has ended. After calling this function, no more
512 : data can be written to the stream.
513 :
514 : @par Preconditions
515 : The serializer is in streaming mode.
516 :
517 : @par Postconditions
518 : The stream is closed.
519 : */
520 : BOOST_HTTP_DECL
521 : void
522 : stream_close() noexcept;
523 :
524 : private:
525 : class impl;
526 :
527 : BOOST_HTTP_DECL
528 : detail::workspace&
529 : ws();
530 :
531 : impl* impl_ = nullptr;
532 : };
533 :
534 : //------------------------------------------------
535 :
536 : /** A sink adapter for writing HTTP message bodies.
537 :
538 : Wraps a @ref serializer and a @ref capy::WriteStream to
539 : provide two interfaces for body writing:
540 :
541 : @li **BufferSink** (@ref prepare / @ref commit /
542 : @ref commit_eof) — write directly into the serializer's
543 : internal buffer (zero-copy). Triggers @ref start_writes
544 : lazily.
545 : @li **WriteSink** (@ref write / @ref write_eof) — pass
546 : caller-owned buffers; the sink copies data through the
547 : serializer. Triggers @ref start_buffers lazily.
548 :
549 : Both interfaces handle chunked framing, compression, and
550 : Content-Length validation automatically.
551 :
552 : The sink is a lightweight handle that can be created once
553 : and reused across multiple messages. The serializer and
554 : stream must outlive the sink.
555 :
556 : @tparam Stream The underlying stream type satisfying
557 : @ref capy::WriteStream.
558 :
559 : @par Thread Safety
560 : Distinct objects: Safe.
561 : Shared objects: Unsafe.
562 :
563 : @par Example
564 : @code
565 : capy::task<>
566 : send_response(capy::WriteStream auto& socket)
567 : {
568 : http::serializer sr(cfg);
569 : auto sink = sr.sink_for(socket);
570 :
571 : http::response res;
572 : res.set_payload_size(5);
573 : sr.set_message(res);
574 :
575 : // WriteSink: pass your own buffer
576 : co_await sink.write_eof(
577 : capy::make_buffer(std::string_view("hello")));
578 : }
579 : @endcode
580 :
581 : @see @ref capy::BufferSink, @ref capy::any_buffer_sink,
582 : @ref serializer.
583 : */
584 : template<capy::WriteStream Stream>
585 : class serializer::sink
586 : {
587 : Stream* stream_ = nullptr;
588 : serializer* sr_ = nullptr;
589 :
590 : public:
591 : /** Constructor.
592 :
593 : A default-constructed sink is in an empty state.
594 : */
595 : sink() noexcept = default;
596 :
597 : /** Constructor.
598 :
599 : @param stream The underlying stream to write serialized data to.
600 : @param sr The serializer performing HTTP framing.
601 : */
602 209 : sink(
603 : Stream& stream,
604 : serializer& sr) noexcept
605 209 : : stream_(&stream)
606 209 : , sr_(&sr)
607 : {
608 209 : }
609 :
610 : /** Prepare writable buffers.
611 :
612 : Fills the provided span with mutable buffer descriptors
613 : pointing to the serializer's internal storage. This
614 : operation is synchronous. Lazily starts the serializer
615 : in write mode if not already started.
616 :
617 : @param dest Span of mutable_buffer to fill.
618 :
619 : @return A span of filled buffers.
620 : */
621 : std::span<capy::mutable_buffer>
622 94 : prepare(std::span<capy::mutable_buffer> dest)
623 : {
624 94 : if(sr_->is_start())
625 88 : sr_->start_writes();
626 94 : auto bufs = sr_->stream_prepare();
627 94 : std::size_t count = 0;
628 188 : for(auto const& b : bufs)
629 : {
630 188 : if(count >= dest.size() || b.size() == 0)
631 94 : break;
632 94 : dest[count++] = b;
633 : }
634 188 : return dest.first(count);
635 : }
636 :
637 : /** Commit bytes written to the prepared buffers.
638 :
639 : Commits `n` bytes written to the buffers returned by the
640 : most recent call to @ref prepare. The operation flushes
641 : serialized output to the underlying stream.
642 :
643 : @param n The number of bytes to commit.
644 :
645 : @return An awaitable yielding `(error_code)`.
646 : */
647 : auto
648 94 : commit(std::size_t n)
649 : -> capy::io_task<>
650 : {
651 : if(sr_->is_start())
652 : sr_->start_writes();
653 : sr_->stream_commit(n);
654 :
655 : while(!sr_->is_done())
656 : {
657 : auto cbs = sr_->prepare();
658 : if(cbs.has_error())
659 : {
660 : if(cbs.error() == error::need_data)
661 : break;
662 : co_return {cbs.error()};
663 : }
664 :
665 : if(capy::buffer_empty(*cbs))
666 : {
667 : // advance state machine
668 : sr_->consume(0);
669 : continue;
670 : }
671 :
672 : auto [ec, written] = co_await stream_->write_some(*cbs);
673 : sr_->consume(written);
674 :
675 : if(ec)
676 : co_return {ec};
677 : }
678 :
679 : co_return {};
680 188 : }
681 :
682 : /** Commit final bytes and signal end-of-stream.
683 :
684 : Commits `n` bytes written to the buffers returned by the
685 : most recent call to @ref prepare and closes the body stream,
686 : flushing any remaining serializer output to the underlying
687 : stream. For chunked encoding, this writes the final
688 : zero-length chunk.
689 :
690 : @param n The number of bytes to commit.
691 :
692 : @return An awaitable yielding `(error_code)`.
693 :
694 : @post The serializer's `is_done()` returns `true` on success.
695 : */
696 : auto
697 108 : commit_eof(std::size_t n)
698 : -> capy::io_task<>
699 : {
700 : if(sr_->is_start())
701 : sr_->start_writes();
702 : sr_->stream_commit(n);
703 : sr_->stream_close();
704 :
705 : while(!sr_->is_done())
706 : {
707 : auto cbs = sr_->prepare();
708 : if(cbs.has_error())
709 : {
710 : if(cbs.error() == error::need_data)
711 : continue;
712 : co_return {cbs.error()};
713 : }
714 :
715 : if(capy::buffer_empty(*cbs))
716 : {
717 : // advance state machine
718 : sr_->consume(0);
719 : continue;
720 : }
721 :
722 : auto [ec, written] = co_await stream_->write_some(*cbs);
723 : sr_->consume(written);
724 :
725 : if(ec)
726 : co_return {ec};
727 : }
728 :
729 : co_return {};
730 216 : }
731 :
732 : /** Write body data from caller-owned buffers.
733 :
734 : Lazily starts the serializer in buffer mode if not
735 : already started. Writes all data from the provided
736 : buffers through the serializer to the underlying stream.
737 :
738 : @param buffers The buffer sequence containing body data.
739 :
740 : @return An awaitable yielding `(error_code, std::size_t)`.
741 : The size_t is the total number of body bytes written.
742 : */
743 : template<class ConstBufferSequence>
744 : auto
745 72 : write(ConstBufferSequence const& buffers)
746 : -> capy::io_task<std::size_t>
747 : {
748 : if(sr_->is_start())
749 : sr_->start_buffers();
750 :
751 : // Drain header first
752 : while(!sr_->is_done())
753 : {
754 : auto cbs = sr_->prepare();
755 : if(cbs.has_error())
756 : {
757 : if(cbs.error() == error::need_data)
758 : break;
759 : co_return {cbs.error(), 0};
760 : }
761 :
762 : if(capy::buffer_empty(*cbs))
763 : {
764 : // advance state machine
765 : sr_->consume(0);
766 : continue;
767 : }
768 :
769 : auto [ec, written] = co_await stream_->write_some(*cbs);
770 : sr_->consume(written);
771 :
772 : if(ec)
773 : co_return {ec, 0};
774 : }
775 :
776 : // Write body data through stream_prepare/commit
777 : std::size_t total = 0;
778 : for(auto it = capy::begin(buffers);
779 : it != capy::end(buffers); ++it)
780 : {
781 : capy::const_buffer src = *it;
782 : while(src.size() != 0)
783 : {
784 : auto mbp = sr_->stream_prepare();
785 : std::size_t copied = 0;
786 : for(auto const& mb : mbp)
787 : {
788 : auto chunk = (std::min)(
789 : mb.size(), src.size());
790 : if(chunk == 0)
791 : break;
792 : std::memcpy(mb.data(),
793 : src.data(), chunk);
794 : src += chunk;
795 : copied += chunk;
796 : }
797 : sr_->stream_commit(copied);
798 : total += copied;
799 :
800 : // Drain output
801 : while(!sr_->is_done())
802 : {
803 : auto cbs = sr_->prepare();
804 : if(cbs.has_error())
805 : {
806 : if(cbs.error() == error::need_data)
807 : break;
808 : co_return {cbs.error(), total};
809 : }
810 :
811 : if(capy::buffer_empty(*cbs))
812 : {
813 : // advance state machine
814 : sr_->consume(0);
815 : continue;
816 : }
817 :
818 : auto [ec, written] =
819 : co_await stream_->write_some(*cbs);
820 : sr_->consume(written);
821 :
822 : if(ec)
823 : co_return {ec, total};
824 : }
825 : }
826 : }
827 :
828 : co_return {{}, total};
829 144 : }
830 :
831 : /** Write final body data and signal end-of-stream.
832 :
833 : Lazily starts the serializer in buffer mode if not
834 : already started. Writes all data from the provided
835 : buffers and then closes the body stream, flushing
836 : any remaining output to the underlying stream.
837 :
838 : @param buffers The buffer sequence containing final body data.
839 :
840 : @return An awaitable yielding `(error_code, std::size_t)`.
841 : The size_t is the total number of body bytes written.
842 :
843 : @post The serializer's `is_done()` returns `true` on success.
844 : */
845 : template<class ConstBufferSequence>
846 : auto
847 44 : write_eof(ConstBufferSequence const& buffers)
848 : -> capy::io_task<std::size_t>
849 : {
850 : auto [ec, n] = co_await write(buffers);
851 : if(ec)
852 : co_return {ec, n};
853 :
854 : sr_->stream_close();
855 :
856 : while(!sr_->is_done())
857 : {
858 : auto cbs = sr_->prepare();
859 : if(cbs.has_error())
860 : {
861 : if(cbs.error() == error::need_data)
862 : continue;
863 : co_return {cbs.error(), n};
864 : }
865 :
866 : if(capy::buffer_empty(*cbs))
867 : {
868 : // advance state machine
869 : sr_->consume(0);
870 : continue;
871 : }
872 :
873 : auto [ec2, written] = co_await stream_->write_some(*cbs);
874 : sr_->consume(written);
875 :
876 : if(ec2)
877 : co_return {ec2, n};
878 : }
879 :
880 : co_return {{}, n};
881 88 : }
882 :
883 : /** Signal end-of-stream with no additional data.
884 :
885 : Lazily starts the serializer in buffer mode if not
886 : already started. Closes the body stream and flushes
887 : any remaining output to the underlying stream.
888 :
889 : @return An awaitable yielding `(error_code)`.
890 :
891 : @post The serializer's `is_done()` returns `true` on success.
892 : */
893 : auto
894 16 : write_eof()
895 : -> capy::io_task<>
896 : {
897 : if(sr_->is_start())
898 : sr_->start_buffers();
899 :
900 : sr_->stream_close();
901 :
902 : while(!sr_->is_done())
903 : {
904 : auto cbs = sr_->prepare();
905 : if(cbs.has_error())
906 : {
907 : if(cbs.error() == error::need_data)
908 : continue;
909 : co_return {cbs.error()};
910 : }
911 :
912 : if(capy::buffer_empty(*cbs))
913 : {
914 : // advance state machine
915 : sr_->consume(0);
916 : continue;
917 : }
918 :
919 : auto [ec, written] = co_await stream_->write_some(*cbs);
920 : sr_->consume(written);
921 :
922 : if(ec)
923 : co_return {ec};
924 : }
925 :
926 : co_return {};
927 32 : }
928 : };
929 :
930 : //------------------------------------------------
931 :
932 : template<capy::WriteStream Stream>
933 : serializer::sink<Stream>
934 209 : serializer::sink_for(Stream& ws) noexcept
935 : {
936 209 : return sink<Stream>(ws, *this);
937 : }
938 :
939 : } // http
940 : } // boost
941 :
942 : #endif
|