.\" Man page generated from reStructuredText. . . .nr rst2man-indent-level 0 . .de1 rstReportMargin \\$1 \\n[an-margin] level \\n[rst2man-indent-level] level margin: \\n[rst2man-indent\\n[rst2man-indent-level]] - \\n[rst2man-indent0] \\n[rst2man-indent1] \\n[rst2man-indent2] .. .de1 INDENT .\" .rstReportMargin pre: . RS \\$1 . nr rst2man-indent\\n[rst2man-indent-level] \\n[an-margin] . nr rst2man-indent-level +1 .\" .rstReportMargin post: .. .de UNINDENT . RE .\" indent \\n[an-margin] .\" old: \\n[rst2man-indent\\n[rst2man-indent-level]] .nr rst2man-indent-level -1 .\" new: \\n[rst2man-indent\\n[rst2man-indent-level]] .in \\n[rst2man-indent\\n[rst2man-indent-level]]u .. .TH "MONGOC_CHANGE_STREAM_T" "3" "May 07, 2024" "1.27.1" "libmongoc" .SH SYNOPSIS .INDENT 0.0 .INDENT 3.5 .sp .EX #include typedef struct _mongoc_change_stream_t mongoc_change_stream_t; .EE .UNINDENT .UNINDENT .sp \fI\%mongoc_change_stream_t\fP is a handle to a change stream. A collection change stream can be obtained using \fI\%mongoc_collection_watch()\fP\&. .sp It is recommended to use a \fI\%mongoc_change_stream_t\fP and its functions instead of a raw aggregation with a \fB$changeStream\fP stage. For more information see the \fI\%MongoDB Manual Entry on Change Streams\fP\&. .SH EXAMPLE .sp example\-collection\-watch.c .INDENT 0.0 .INDENT 3.5 .sp .EX #include int main (void) { bson_t empty = BSON_INITIALIZER; const bson_t *doc; bson_t *to_insert = BCON_NEW (\(dqx\(dq, BCON_INT32 (1)); const bson_t *err_doc; bson_error_t error; const char *uri_string; mongoc_uri_t *uri; mongoc_client_t *client; mongoc_collection_t *coll; mongoc_change_stream_t *stream; mongoc_write_concern_t *wc = mongoc_write_concern_new (); bson_t opts = BSON_INITIALIZER; bool r; mongoc_init (); uri_string = \(dqmongodb://\(dq \(dqlocalhost:27017,localhost:27018,localhost:\(dq \(dq27019/db?replicaSet=rs0\(dq; uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, \(dqfailed to parse URI: %s\en\(dq \(dqerror message: %s\en\(dq, uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } coll = mongoc_client_get_collection (client, \(dqdb\(dq, \(dqcoll\(dq); stream = mongoc_collection_watch (coll, &empty, NULL); mongoc_write_concern_set_wmajority (wc, 10000); mongoc_write_concern_append (wc, &opts); r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error); if (!r) { fprintf (stderr, \(dqError: %s\en\(dq, error.message); return EXIT_FAILURE; } while (mongoc_change_stream_next (stream, &doc)) { char *as_json = bson_as_relaxed_extended_json (doc, NULL); fprintf (stderr, \(dqGot document: %s\en\(dq, as_json); bson_free (as_json); } if (mongoc_change_stream_error_document (stream, &error, &err_doc)) { if (!bson_empty (err_doc)) { fprintf (stderr, \(dqServer Error: %s\en\(dq, bson_as_relaxed_extended_json (err_doc, NULL)); } else { fprintf (stderr, \(dqClient Error: %s\en\(dq, error.message); } return EXIT_FAILURE; } bson_destroy (to_insert); mongoc_write_concern_destroy (wc); bson_destroy (&opts); mongoc_change_stream_destroy (stream); mongoc_collection_destroy (coll); mongoc_uri_destroy (uri); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; } .EE .UNINDENT .UNINDENT .SS Starting and Resuming .sp All \fBwatch\fP functions accept several options to indicate where a change stream should start returning changes from: \fBresumeAfter\fP, \fBstartAfter\fP, and \fBstartAtOperationTime\fP\&. .sp All changes returned by \fI\%mongoc_change_stream_next()\fP include a resume token in the \fB_id\fP field. MongoDB 4.2 also includes an additional resume token in each \(dqaggregate\(dq and \(dqgetMore\(dq command response, which points to the end of that response\(aqs batch. The current token is automatically cached by libmongoc. In the event of an error, libmongoc attempts to recreate the change stream starting where it left off by passing the cached resume token. libmongoc only attempts to resume once, but client applications can access the cached resume token with \fI\%mongoc_change_stream_get_resume_token()\fP and use it for their own resume logic by passing it as either the \fBresumeAfter\fP or \fBstartAfter\fP option. .sp Additionally, change streams can start returning changes at an operation time by using the \fBstartAtOperationTime\fP field. This can be the timestamp returned in the \fBoperationTime\fP field of a command reply. .sp \fBresumeAfter\fP, \fBstartAfter\fP, and \fBstartAtOperationTime\fP are mutually exclusive options. Setting more than one will result in a server error. .sp The following example implements custom resuming logic, persisting the resume token in a file. .sp example\-resume.c .INDENT 0.0 .INDENT 3.5 .sp .EX #include /* An example implementation of custom resume logic in a change stream. * example\-resume starts a client\-wide change stream and persists the resume * token in a file \(dqresume\-token.json\(dq. On restart, if \(dqresume\-token.json\(dq * exists, the change stream starts watching after the persisted resume token. * * This behavior allows a user to exit example\-resume, and restart it later * without missing any change events. */ #include static const char *RESUME_TOKEN_PATH = \(dqresume\-token.json\(dq; static bool _save_resume_token (const bson_t *doc) { FILE *file_stream; bson_iter_t iter; bson_t resume_token_doc; char *as_json = NULL; size_t as_json_len; ssize_t r, n_written; const bson_value_t *resume_token; if (!bson_iter_init_find (&iter, doc, \(dq_id\(dq)) { fprintf (stderr, \(dqreply does not contain operationTime.\(dq); return false; } resume_token = bson_iter_value (&iter); /* store the resume token in a document, { resumeAfter: } * which we can later append easily. */ file_stream = fopen (RESUME_TOKEN_PATH, \(dqw+\(dq); if (!file_stream) { fprintf (stderr, \(dqfailed to open %s for writing\en\(dq, RESUME_TOKEN_PATH); return false; } bson_init (&resume_token_doc); BSON_APPEND_VALUE (&resume_token_doc, \(dqresumeAfter\(dq, resume_token); as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len); bson_destroy (&resume_token_doc); n_written = 0; while (n_written < as_json_len) { r = fwrite ((void *) (as_json + n_written), sizeof (char), as_json_len \- n_written, file_stream); if (r == \-1) { fprintf (stderr, \(dqfailed to write to %s\en\(dq, RESUME_TOKEN_PATH); bson_free (as_json); fclose (file_stream); return false; } n_written += r; } bson_free (as_json); fclose (file_stream); return true; } bool _load_resume_token (bson_t *opts) { bson_error_t error; bson_json_reader_t *reader; bson_t doc; /* if the file does not exist, skip. */ if (\-1 == access (RESUME_TOKEN_PATH, R_OK)) { return true; } reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error); if (!reader) { fprintf (stderr, \(dqfailed to open %s for reading: %s\en\(dq, RESUME_TOKEN_PATH, error.message); return false; } bson_init (&doc); if (\-1 == bson_json_reader_read (reader, &doc, &error)) { fprintf (stderr, \(dqfailed to read doc from %s\en\(dq, RESUME_TOKEN_PATH); bson_destroy (&doc); bson_json_reader_destroy (reader); return false; } printf (\(dqfound cached resume token in %s, resuming change stream.\en\(dq, RESUME_TOKEN_PATH); bson_concat (opts, &doc); bson_destroy (&doc); bson_json_reader_destroy (reader); return true; } int main (void) { int exit_code = EXIT_FAILURE; const char *uri_string; mongoc_uri_t *uri = NULL; bson_error_t error; mongoc_client_t *client = NULL; bson_t pipeline = BSON_INITIALIZER; bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream = NULL; const bson_t *doc; const int max_time = 30; /* max amount of time, in seconds, that mongoc_change_stream_next can block. */ mongoc_init (); uri_string = \(dqmongodb://localhost:27017/db?replicaSet=rs0\(dq; uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, \(dqfailed to parse URI: %s\en\(dq \(dqerror message: %s\en\(dq, uri_string, error.message); goto cleanup; } client = mongoc_client_new_from_uri (uri); if (!client) { goto cleanup; } if (!_load_resume_token (&opts)) { goto cleanup; } BSON_APPEND_INT64 (&opts, \(dqmaxAwaitTimeMS\(dq, max_time * 1000); printf (\(dqlistening for changes on the client (max %d seconds).\en\(dq, max_time); stream = mongoc_client_watch (client, &pipeline, &opts); while (mongoc_change_stream_next (stream, &doc)) { char *as_json; as_json = bson_as_canonical_extended_json (doc, NULL); printf (\(dqchange received: %s\en\(dq, as_json); bson_free (as_json); if (!_save_resume_token (doc)) { goto cleanup; } } exit_code = EXIT_SUCCESS; cleanup: mongoc_uri_destroy (uri); bson_destroy (&pipeline); bson_destroy (&opts); mongoc_change_stream_destroy (stream); mongoc_client_destroy (client); mongoc_cleanup (); return exit_code; } .EE .UNINDENT .UNINDENT .sp The following example shows using \fBstartAtOperationTime\fP to synchronize a change stream with another operation. .sp example\-start\-at\-optime.c .INDENT 0.0 .INDENT 3.5 .sp .EX /* An example of starting a change stream with startAtOperationTime. */ #include int main (void) { int exit_code = EXIT_FAILURE; const char *uri_string; mongoc_uri_t *uri = NULL; bson_error_t error; mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t pipeline = BSON_INITIALIZER; bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream = NULL; bson_iter_t iter; const bson_t *doc; bson_value_t cached_operation_time = {0}; int i; bool r; mongoc_init (); uri_string = \(dqmongodb://localhost:27017/db?replicaSet=rs0\(dq; uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, \(dqfailed to parse URI: %s\en\(dq \(dqerror message: %s\en\(dq, uri_string, error.message); goto cleanup; } client = mongoc_client_new_from_uri (uri); if (!client) { goto cleanup; } /* insert five documents. */ coll = mongoc_client_get_collection (client, \(dqdb\(dq, \(dqcoll\(dq); for (i = 0; i < 5; i++) { bson_t reply; bson_t *insert_cmd = BCON_NEW (\(dqinsert\(dq, \(dqcoll\(dq, \(dqdocuments\(dq, \(dq[\(dq, \(dq{\(dq, \(dqx\(dq, BCON_INT64 (i), \(dq}\(dq, \(dq]\(dq); r = mongoc_collection_write_command_with_opts (coll, insert_cmd, NULL, &reply, &error); bson_destroy (insert_cmd); if (!r) { bson_destroy (&reply); fprintf (stderr, \(dqfailed to insert: %s\en\(dq, error.message); goto cleanup; } if (i == 0) { /* cache the operation time in the first reply. */ if (bson_iter_init_find (&iter, &reply, \(dqoperationTime\(dq)) { bson_value_copy (bson_iter_value (&iter), &cached_operation_time); } else { fprintf (stderr, \(dqreply does not contain operationTime.\(dq); bson_destroy (&reply); goto cleanup; } } bson_destroy (&reply); } /* start a change stream at the first returned operationTime. */ BSON_APPEND_VALUE (&opts, \(dqstartAtOperationTime\(dq, &cached_operation_time); stream = mongoc_collection_watch (coll, &pipeline, &opts); /* since the change stream started at the operation time of the first * insert, the five inserts are returned. */ printf (\(dqlistening for changes on db.coll:\en\(dq); while (mongoc_change_stream_next (stream, &doc)) { char *as_json; as_json = bson_as_canonical_extended_json (doc, NULL); printf (\(dqchange received: %s\en\(dq, as_json); bson_free (as_json); } exit_code = EXIT_SUCCESS; cleanup: mongoc_uri_destroy (uri); bson_destroy (&pipeline); bson_destroy (&opts); if (cached_operation_time.value_type) { bson_value_destroy (&cached_operation_time); } mongoc_change_stream_destroy (stream); mongoc_collection_destroy (coll); mongoc_client_destroy (client); mongoc_cleanup (); return exit_code; } .EE .UNINDENT .UNINDENT .SH AUTHOR MongoDB, Inc .SH COPYRIGHT 2017-present, MongoDB, Inc .\" Generated by docutils manpage writer. .